1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
24 "git.arvados.org/arvados.git/sdk/go/arvados"
25 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
26 "git.arvados.org/arvados.git/sdk/go/arvadostest"
30 func Test(t *testing.T) {
31 DefaultRetryDelay = 50 * time.Millisecond
35 // Gocheck boilerplate
36 var _ = Suite(&ServerRequiredSuite{})
37 var _ = Suite(&StandaloneSuite{})
39 // Tests that require the Keep server running
40 type ServerRequiredSuite struct{}
43 type StandaloneSuite struct{}
45 var origHOME = os.Getenv("HOME")
47 func (s *StandaloneSuite) SetUpTest(c *C) {
48 RefreshServiceDiscovery()
49 // Prevent cache state from leaking between test cases
50 os.Setenv("HOME", c.MkDir())
53 func (s *StandaloneSuite) TearDownTest(c *C) {
54 os.Setenv("HOME", origHOME)
57 func pythonDir() string {
59 return fmt.Sprintf("%s/../../python/tests", cwd)
62 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
63 arvadostest.StartKeep(2, false)
66 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
67 arvadostest.StopKeep(2)
68 os.Setenv("HOME", origHOME)
71 func (s *ServerRequiredSuite) SetUpTest(c *C) {
72 RefreshServiceDiscovery()
73 // Prevent cache state from leaking between test cases
74 os.Setenv("HOME", c.MkDir())
77 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
78 arv, err := arvadosclient.MakeArvadosClient()
81 kc, err := MakeKeepClient(arv)
84 c.Check(len(kc.LocalRoots()), Equals, 2)
85 for _, root := range kc.LocalRoots() {
86 c.Check(root, Matches, "http://localhost:\\d+")
90 func (s *ServerRequiredSuite) TestDefaultStorageClasses(c *C) {
91 arv, err := arvadosclient.MakeArvadosClient()
94 cc, err := arv.ClusterConfig("StorageClasses")
97 c.Assert(cc.(map[string]interface{})["default"], NotNil)
100 c.Assert(kc.DefaultStorageClasses, DeepEquals, []string{"default"})
103 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
104 arv, err := arvadosclient.MakeArvadosClient()
107 kc, err := MakeKeepClient(arv)
109 c.Assert(kc.Want_replicas, Equals, 2)
111 arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
112 kc, err = MakeKeepClient(arv)
114 c.Assert(kc.Want_replicas, Equals, 3)
116 arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
117 kc, err = MakeKeepClient(arv)
119 c.Assert(kc.Want_replicas, Equals, 1)
122 type StubPutHandler struct {
125 expectAPIToken string
127 expectStorageClass string
128 returnStorageClasses string
130 requests []*http.Request
134 func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
136 sph.requests = append(sph.requests, req)
138 sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
139 sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken))
140 if sph.expectStorageClass != "*" {
141 sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
143 body, err := ioutil.ReadAll(req.Body)
144 sph.c.Check(err, IsNil)
145 sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
146 resp.Header().Set("X-Keep-Replicas-Stored", "1")
147 if sph.returnStorageClasses != "" {
148 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
150 resp.WriteHeader(200)
151 sph.handled <- fmt.Sprintf("http://%s", req.Host)
154 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
156 // If we don't explicitly bind it to localhost, ks.listener.Addr() will
157 // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
158 ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
160 panic("Could not listen on any port")
162 ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
163 go http.Serve(ks.listener, st)
167 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
168 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
170 ks := RunFakeKeepServer(st)
171 defer ks.listener.Close()
173 arv, _ := arvadosclient.MakeArvadosClient()
174 arv.ApiToken = "abc123"
176 kc, _ := MakeKeepClient(arv)
178 reader, writer := io.Pipe()
179 uploadStatusChan := make(chan uploadStatus)
181 f(kc, ks.url, reader, writer, uploadStatusChan)
184 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
185 log.Printf("TestUploadToStubKeepServer")
187 st := &StubPutHandler{
189 expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
190 expectAPIToken: "abc123",
192 expectStorageClass: "",
193 returnStorageClasses: "default=1",
194 handled: make(chan string),
197 UploadToStubHelper(c, st,
198 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
199 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
201 writer.Write([]byte("foo"))
205 status := <-uploadStatusChan
206 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
210 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
211 st := &StubPutHandler{
213 expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
214 expectAPIToken: "abc123",
216 expectStorageClass: "",
217 returnStorageClasses: "default=1",
218 handled: make(chan string),
221 UploadToStubHelper(c, st,
222 func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
223 go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
227 status := <-uploadStatusChan
228 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
232 func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
233 for _, trial := range []struct {
235 expectMap map[string]int
238 {"foo=1", map[string]int{"foo": 1}},
239 {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
243 st := &StubPutHandler{
245 expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
246 expectAPIToken: "abc123",
248 expectStorageClass: "",
249 returnStorageClasses: trial.respHeader,
250 handled: make(chan string),
253 UploadToStubHelper(c, st,
254 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
255 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
257 writer.Write([]byte("foo"))
261 status := <-uploadStatusChan
262 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
267 func (s *StandaloneSuite) TestPutWithoutStorageClassesClusterSupport(c *C) {
269 for _, trial := range []struct {
271 clientClasses []string
277 // Talking to an older cluster (no default storage classes exported
278 // config) and no other additional storage classes requirements.
279 {1, nil, nil, 1, 1, true},
280 {2, nil, nil, 2, 2, true},
281 {3, nil, nil, 3, 3, true},
282 {nServers*2 + 1, nil, nil, nServers, nServers, false},
284 {1, []string{"class1"}, nil, 1, 1, true},
285 {2, []string{"class1"}, nil, 2, 2, true},
286 {3, []string{"class1"}, nil, 3, 3, true},
287 {1, []string{"class1", "class2"}, nil, 1, 1, true},
288 {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false},
290 {1, nil, []string{"class1"}, 1, 1, true},
291 {2, nil, []string{"class1"}, 2, 2, true},
292 {3, nil, []string{"class1"}, 3, 3, true},
293 {1, nil, []string{"class1", "class2"}, 1, 1, true},
294 {nServers*2 + 1, nil, []string{"class1"}, nServers, nServers, false},
297 st := &StubPutHandler{
299 expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
300 expectAPIToken: "abc123",
302 expectStorageClass: "*",
303 returnStorageClasses: "", // Simulate old cluster without SC keep support
304 handled: make(chan string, 100),
306 ks := RunSomeFakeKeepServers(st, nServers)
307 arv, _ := arvadosclient.MakeArvadosClient()
308 kc, _ := MakeKeepClient(arv)
309 kc.Want_replicas = trial.replicas
310 kc.StorageClasses = trial.clientClasses
311 kc.DefaultStorageClasses = nil // Simulate an old cluster without SC defaults
312 arv.ApiToken = "abc123"
313 localRoots := make(map[string]string)
314 writableLocalRoots := make(map[string]string)
315 for i, k := range ks {
316 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
317 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
318 defer k.listener.Close()
320 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
322 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
324 StorageClasses: trial.putClasses,
331 c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
332 c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
333 if trial.clientClasses == nil && trial.putClasses == nil {
334 c.Check(st.requests[0].Header.Get("X-Keep-Storage-Classes"), Equals, "")
339 func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
341 for _, trial := range []struct {
343 defaultClasses []string
344 clientClasses []string // clientClasses takes precedence over defaultClasses
345 putClasses []string // putClasses takes precedence over clientClasses
350 {1, []string{"class1"}, nil, nil, 1, 1, true},
351 {2, []string{"class1"}, nil, nil, 1, 2, true},
352 {3, []string{"class1"}, nil, nil, 2, 3, true},
353 {1, []string{"class1", "class2"}, nil, nil, 1, 1, true},
355 // defaultClasses doesn't matter when any of the others is specified.
356 {1, []string{"class1"}, []string{"class1"}, nil, 1, 1, true},
357 {2, []string{"class1"}, []string{"class1"}, nil, 1, 2, true},
358 {3, []string{"class1"}, []string{"class1"}, nil, 2, 3, true},
359 {1, []string{"class1"}, []string{"class1", "class2"}, nil, 1, 1, true},
360 {3, []string{"class1"}, nil, []string{"class1"}, 2, 3, true},
361 {1, []string{"class1"}, nil, []string{"class1", "class2"}, 1, 1, true},
362 {1, []string{"class1"}, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true},
363 {1, []string{"class1"}, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false},
364 {nServers*2 + 1, []string{}, []string{"class1"}, nil, nServers, nServers, false},
365 {1, []string{"class1"}, []string{"class404"}, nil, nServers, nServers, false},
366 {1, []string{"class1"}, []string{"class1", "class404"}, nil, nServers, nServers, false},
367 {1, []string{"class1"}, nil, []string{"class1", "class404"}, nServers, nServers, false},
370 st := &StubPutHandler{
372 expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
373 expectAPIToken: "abc123",
375 expectStorageClass: "*",
376 returnStorageClasses: "class1=2, class2=2",
377 handled: make(chan string, 100),
379 ks := RunSomeFakeKeepServers(st, nServers)
380 arv, _ := arvadosclient.MakeArvadosClient()
381 kc, _ := MakeKeepClient(arv)
382 kc.Want_replicas = trial.replicas
383 kc.StorageClasses = trial.clientClasses
384 kc.DefaultStorageClasses = trial.defaultClasses
385 arv.ApiToken = "abc123"
386 localRoots := make(map[string]string)
387 writableLocalRoots := make(map[string]string)
388 for i, k := range ks {
389 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
390 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
391 defer k.listener.Close()
393 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
395 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
397 StorageClasses: trial.putClasses,
404 c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
405 c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
406 if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, Equals, true) {
407 // Max concurrency should be 1. First request
408 // should have succeeded for class1. Second
409 // request should only ask for class404.
410 c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), Equals, "class404")
415 type FailHandler struct {
419 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
420 resp.WriteHeader(500)
421 fh.handled <- fmt.Sprintf("http://%s", req.Host)
424 type FailThenSucceedHandler struct {
425 morefails int // fail 1 + this many times before succeeding
428 successhandler http.Handler
432 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
433 fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
434 if int(fh.count.Add(1)) <= fh.morefails+1 {
435 resp.WriteHeader(500)
436 fh.handled <- fmt.Sprintf("http://%s", req.Host)
438 fh.successhandler.ServeHTTP(resp, req)
442 type Error404Handler struct {
446 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
447 resp.WriteHeader(404)
448 fh.handled <- fmt.Sprintf("http://%s", req.Host)
451 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
455 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
457 UploadToStubHelper(c, st,
458 func(kc *KeepClient, url string, reader io.ReadCloser,
459 writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
461 go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
463 writer.Write([]byte("foo"))
468 status := <-uploadStatusChan
469 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
470 c.Check(status.statusCode, Equals, 500)
474 type KeepServer struct {
475 listener net.Listener
479 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
480 ks = make([]KeepServer, n)
482 for i := 0; i < n; i++ {
483 ks[i] = RunFakeKeepServer(st)
489 func (s *StandaloneSuite) TestPutB(c *C) {
490 hash := Md5String("foo")
492 st := &StubPutHandler{
495 expectAPIToken: "abc123",
497 expectStorageClass: "default",
498 returnStorageClasses: "",
499 handled: make(chan string, 5),
502 arv, _ := arvadosclient.MakeArvadosClient()
503 kc, _ := MakeKeepClient(arv)
506 arv.ApiToken = "abc123"
507 localRoots := make(map[string]string)
508 writableLocalRoots := make(map[string]string)
510 ks := RunSomeFakeKeepServers(st, 5)
512 for i, k := range ks {
513 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
514 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
515 defer k.listener.Close()
518 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
520 kc.PutB([]byte("foo"))
522 shuff := NewRootSorter(
523 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
527 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
528 (s1 == shuff[1] && s2 == shuff[0]),
533 func (s *StandaloneSuite) TestPutHR(c *C) {
534 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
536 st := &StubPutHandler{
539 expectAPIToken: "abc123",
541 expectStorageClass: "default",
542 returnStorageClasses: "",
543 handled: make(chan string, 5),
546 arv, _ := arvadosclient.MakeArvadosClient()
547 kc, _ := MakeKeepClient(arv)
550 arv.ApiToken = "abc123"
551 localRoots := make(map[string]string)
552 writableLocalRoots := make(map[string]string)
554 ks := RunSomeFakeKeepServers(st, 5)
556 for i, k := range ks {
557 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
558 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
559 defer k.listener.Close()
562 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
564 kc.PutHR(hash, bytes.NewBuffer([]byte("foo")), 3)
566 shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
571 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
572 (s1 == shuff[1] && s2 == shuff[0]),
577 func (s *StandaloneSuite) TestPutWithFail(c *C) {
578 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
580 st := &StubPutHandler{
583 expectAPIToken: "abc123",
585 expectStorageClass: "default",
586 returnStorageClasses: "",
587 handled: make(chan string, 4),
591 make(chan string, 1)}
593 arv, err := arvadosclient.MakeArvadosClient()
595 kc, _ := MakeKeepClient(arv)
598 arv.ApiToken = "abc123"
599 localRoots := make(map[string]string)
600 writableLocalRoots := make(map[string]string)
602 ks1 := RunSomeFakeKeepServers(st, 4)
603 ks2 := RunSomeFakeKeepServers(fh, 1)
605 for i, k := range ks1 {
606 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
607 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
608 defer k.listener.Close()
610 for i, k := range ks2 {
611 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
612 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
613 defer k.listener.Close()
616 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
618 shuff := NewRootSorter(
619 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
622 phash, replicas, err := kc.PutB([]byte("foo"))
627 c.Check(phash, Equals, "")
628 c.Check(replicas, Equals, 2)
633 c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
634 (s1 == shuff[2] && s2 == shuff[1]),
639 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
640 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
642 st := &StubPutHandler{
645 expectAPIToken: "abc123",
647 expectStorageClass: "default",
648 returnStorageClasses: "",
649 handled: make(chan string, 1),
653 make(chan string, 4)}
655 arv, err := arvadosclient.MakeArvadosClient()
657 kc, _ := MakeKeepClient(arv)
661 arv.ApiToken = "abc123"
662 localRoots := make(map[string]string)
663 writableLocalRoots := make(map[string]string)
665 ks1 := RunSomeFakeKeepServers(st, 1)
666 ks2 := RunSomeFakeKeepServers(fh, 4)
668 for i, k := range ks1 {
669 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
670 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
671 defer k.listener.Close()
673 for i, k := range ks2 {
674 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
675 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
676 defer k.listener.Close()
679 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
681 _, replicas, err := kc.PutB([]byte("foo"))
683 c.Check(err, FitsTypeOf, InsufficientReplicasError{})
684 c.Check(replicas, Equals, 1)
685 c.Check(<-st.handled, Equals, ks1[0].url)
688 type StubGetHandler struct {
691 expectAPIToken string
696 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
697 sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
698 sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken))
699 resp.WriteHeader(sgh.httpStatus)
700 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
704 func (s *StandaloneSuite) TestGet(c *C) {
705 hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
707 st := StubGetHandler{
714 ks := RunFakeKeepServer(st)
715 defer ks.listener.Close()
717 arv, err := arvadosclient.MakeArvadosClient()
719 kc, _ := MakeKeepClient(arv)
720 arv.ApiToken = "abc123"
721 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
723 r, n, _, err := kc.Get(hash)
725 c.Check(n, Equals, int64(3))
727 content, err2 := ioutil.ReadAll(r)
729 c.Check(content, DeepEquals, []byte("foo"))
730 c.Check(r.Close(), IsNil)
733 func (s *StandaloneSuite) TestGet404(c *C) {
734 hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
736 st := Error404Handler{make(chan string, 1)}
738 ks := RunFakeKeepServer(st)
739 defer ks.listener.Close()
741 arv, err := arvadosclient.MakeArvadosClient()
743 kc, _ := MakeKeepClient(arv)
744 arv.ApiToken = "abc123"
745 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
747 r, n, _, err := kc.Get(hash)
748 c.Check(err, Equals, BlockNotFound)
749 c.Check(n, Equals, int64(0))
753 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
754 st := Error404Handler{make(chan string, 1)}
756 ks := RunFakeKeepServer(st)
757 defer ks.listener.Close()
759 arv, err := arvadosclient.MakeArvadosClient()
761 kc, _ := MakeKeepClient(arv)
762 arv.ApiToken = "abc123"
763 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
765 r, n, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
767 c.Check(n, Equals, int64(0))
769 buf, err := ioutil.ReadAll(r)
771 c.Check(buf, DeepEquals, []byte{})
772 c.Check(r.Close(), IsNil)
775 func (s *StandaloneSuite) TestGetFail(c *C) {
776 hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
778 st := FailHandler{make(chan string, 1)}
780 ks := RunFakeKeepServer(st)
781 defer ks.listener.Close()
783 arv, err := arvadosclient.MakeArvadosClient()
785 kc, _ := MakeKeepClient(arv)
786 arv.ApiToken = "abc123"
787 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
790 r, n, _, err := kc.Get(hash)
791 errNotFound, _ := err.(*ErrNotFound)
792 if c.Check(errNotFound, NotNil) {
793 c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
794 c.Check(errNotFound.Temporary(), Equals, true)
796 c.Check(n, Equals, int64(0))
800 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
801 defer func(origDefault, origMinimum time.Duration) {
802 DefaultRetryDelay = origDefault
803 MinimumRetryDelay = origMinimum
804 }(DefaultRetryDelay, MinimumRetryDelay)
805 DefaultRetryDelay = time.Second / 8
806 MinimumRetryDelay = time.Millisecond
808 hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
810 for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
811 c.Logf("=== initial delay %v", delay)
813 st := &FailThenSucceedHandler{
815 handled: make(chan string, 4),
816 successhandler: StubGetHandler{
823 ks := RunFakeKeepServer(st)
824 defer ks.listener.Close()
826 arv, err := arvadosclient.MakeArvadosClient()
828 kc, _ := MakeKeepClient(arv)
829 arv.ApiToken = "abc123"
830 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
832 kc.RetryDelay = delay
833 kc.DiskCacheSize = DiskCacheDisabled
836 r, n, _, err := kc.Get(hash)
838 c.Check(n, Equals, int64(3))
839 elapsed := time.Since(t0)
841 nonsleeptime := time.Second / 10
842 expect := kc.RetryDelay
844 expect = DefaultRetryDelay
846 min := MinimumRetryDelay * 3
847 max := expect + expect*2 + expect*2*2 + nonsleeptime
848 c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
849 c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
851 content, err := ioutil.ReadAll(r)
853 c.Check(content, DeepEquals, []byte("foo"))
854 c.Check(r.Close(), IsNil)
856 c.Logf("%q", st.reqIDs)
857 if c.Check(st.reqIDs, Not(HasLen), 0) {
858 for _, reqid := range st.reqIDs {
859 c.Check(reqid, Not(Equals), "")
860 c.Check(reqid, Equals, st.reqIDs[0])
866 func (s *StandaloneSuite) TestGetNetError(c *C) {
867 hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
869 arv, err := arvadosclient.MakeArvadosClient()
871 kc, _ := MakeKeepClient(arv)
872 arv.ApiToken = "abc123"
873 kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
875 r, n, _, err := kc.Get(hash)
876 errNotFound, _ := err.(*ErrNotFound)
877 if c.Check(errNotFound, NotNil) {
878 c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
879 c.Check(errNotFound.Temporary(), Equals, true)
881 c.Check(n, Equals, int64(0))
885 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
886 uuid := "zzzzz-bi6l4-123451234512345"
887 hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
889 // This one shouldn't be used:
890 ks0 := RunFakeKeepServer(StubGetHandler{
896 defer ks0.listener.Close()
897 // This one should be used:
898 ks := RunFakeKeepServer(StubGetHandler{
904 defer ks.listener.Close()
906 arv, err := arvadosclient.MakeArvadosClient()
908 kc, _ := MakeKeepClient(arv)
909 arv.ApiToken = "abc123"
911 map[string]string{"x": ks0.url},
913 map[string]string{uuid: ks.url})
915 r, n, _, err := kc.Get(hash + "+K@" + uuid)
917 c.Check(n, Equals, int64(3))
919 content, err := ioutil.ReadAll(r)
921 c.Check(content, DeepEquals, []byte("foo"))
922 c.Check(r.Close(), IsNil)
925 // Use a service hint to fetch from a local disk service, overriding
926 // rendezvous probe order.
927 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
928 uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
929 hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
931 // This one shouldn't be used, although it appears first in
932 // rendezvous probe order:
933 ks0 := RunFakeKeepServer(StubGetHandler{
937 http.StatusBadGateway,
939 defer ks0.listener.Close()
940 // This one should be used:
941 ks := RunFakeKeepServer(StubGetHandler{
947 defer ks.listener.Close()
949 arv, err := arvadosclient.MakeArvadosClient()
951 kc, _ := MakeKeepClient(arv)
952 arv.ApiToken = "abc123"
955 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
956 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
957 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
961 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
962 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
963 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
967 r, n, _, err := kc.Get(hash + "+K@" + uuid)
969 c.Check(n, Equals, int64(3))
971 content, err := ioutil.ReadAll(r)
973 c.Check(content, DeepEquals, []byte("foo"))
974 c.Check(r.Close(), IsNil)
977 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
978 uuid := "zzzzz-bi6l4-123451234512345"
979 hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
981 ksLocal := RunFakeKeepServer(StubGetHandler{
987 defer ksLocal.listener.Close()
988 ksGateway := RunFakeKeepServer(StubGetHandler{
992 http.StatusInternalServerError,
994 defer ksGateway.listener.Close()
996 arv, err := arvadosclient.MakeArvadosClient()
998 kc, _ := MakeKeepClient(arv)
999 arv.ApiToken = "abc123"
1001 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
1003 map[string]string{uuid: ksGateway.url})
1005 r, n, _, err := kc.Get(hash + "+K@" + uuid)
1006 c.Assert(err, IsNil)
1007 c.Check(n, Equals, int64(3))
1009 content, err := ioutil.ReadAll(r)
1011 c.Check(content, DeepEquals, []byte("foo"))
1012 c.Check(r.Close(), IsNil)
1015 type BarHandler struct {
1019 func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1020 resp.Write([]byte("bar"))
1021 h.handled <- fmt.Sprintf("http://%s", req.Host)
1024 func (s *StandaloneSuite) TestChecksum(c *C) {
1025 foohash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1026 barhash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
1028 st := BarHandler{make(chan string, 1)}
1030 ks := RunFakeKeepServer(st)
1031 defer ks.listener.Close()
1033 arv, err := arvadosclient.MakeArvadosClient()
1035 kc, _ := MakeKeepClient(arv)
1036 arv.ApiToken = "abc123"
1037 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1039 r, n, _, err := kc.Get(barhash)
1040 if c.Check(err, IsNil) {
1041 _, err = ioutil.ReadAll(r)
1042 c.Check(n, Equals, int64(3))
1048 case <-time.After(time.Second):
1049 c.Fatal("timed out")
1052 r, n, _, err = kc.Get(foohash)
1054 buf, readerr := ioutil.ReadAll(r)
1058 c.Check(err, Equals, BadChecksum)
1062 case <-time.After(time.Second):
1063 c.Fatal("timed out")
1067 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
1068 content := []byte("waz")
1069 hash := fmt.Sprintf("%x+3", md5.Sum(content))
1071 fh := Error404Handler{
1072 make(chan string, 4)}
1074 st := StubGetHandler{
1081 arv, err := arvadosclient.MakeArvadosClient()
1083 kc, _ := MakeKeepClient(arv)
1084 arv.ApiToken = "abc123"
1085 localRoots := make(map[string]string)
1086 writableLocalRoots := make(map[string]string)
1088 ks1 := RunSomeFakeKeepServers(st, 1)
1089 ks2 := RunSomeFakeKeepServers(fh, 4)
1091 for i, k := range ks1 {
1092 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1093 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1094 defer k.listener.Close()
1096 for i, k := range ks2 {
1097 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
1098 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
1099 defer k.listener.Close()
1102 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1105 // This test works only if one of the failing services is
1106 // attempted before the succeeding service. Otherwise,
1107 // <-fh.handled below will just hang! (Probe order depends on
1108 // the choice of block content "waz" and the UUIDs of the fake
1109 // servers, so we just tried different strings until we found
1110 // an example that passes this Assert.)
1111 c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
1113 r, n, _, err := kc.Get(hash)
1117 case <-time.After(time.Second):
1118 c.Fatal("timed out")
1120 c.Assert(err, IsNil)
1121 c.Check(n, Equals, int64(3))
1123 readContent, err2 := ioutil.ReadAll(r)
1124 c.Check(err2, IsNil)
1125 c.Check(readContent, DeepEquals, content)
1126 c.Check(r.Close(), IsNil)
1129 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
1130 content := []byte("TestPutGetHead")
1132 arv, err := arvadosclient.MakeArvadosClient()
1134 kc, err := MakeKeepClient(arv)
1135 c.Assert(err, IsNil)
1137 hash := fmt.Sprintf("%x+%d", md5.Sum(content), len(content))
1140 n, _, err := kc.Ask(hash)
1141 c.Check(err, Equals, BlockNotFound)
1142 c.Check(n, Equals, int64(0))
1145 hash2, replicas, err := kc.PutB(content)
1147 c.Check(hash2, Matches, `\Q`+hash+`\E\b.*`)
1148 c.Check(replicas, Equals, 2)
1151 r, n, _, err := kc.Get(hash)
1153 c.Check(n, Equals, int64(len(content)))
1154 if c.Check(r, NotNil) {
1155 readContent, err := ioutil.ReadAll(r)
1157 if c.Check(len(readContent), Equals, len(content)) {
1158 c.Check(readContent, DeepEquals, content)
1160 c.Check(r.Close(), IsNil)
1164 n, url2, err := kc.Ask(hash)
1166 c.Check(n, Equals, int64(len(content)))
1167 c.Check(url2, Matches, "http://localhost:\\d+/\\Q"+hash+"\\E")
1170 loc, err := kc.LocalLocator(hash)
1172 c.Assert(len(loc) >= 32, Equals, true)
1173 c.Check(loc[:32], Equals, hash[:32])
1176 content := []byte("the perth county conspiracy")
1177 loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
1178 c.Check(loc, Equals, "")
1179 c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
1180 c.Check(err, ErrorMatches, `.*HTTP 400.*`)
1184 type StubProxyHandler struct {
1188 func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1189 resp.Header().Set("X-Keep-Replicas-Stored", "2")
1190 h.handled <- fmt.Sprintf("http://%s", req.Host)
1193 func (s *StandaloneSuite) TestPutProxy(c *C) {
1194 st := StubProxyHandler{make(chan string, 1)}
1196 arv, err := arvadosclient.MakeArvadosClient()
1198 kc, _ := MakeKeepClient(arv)
1200 kc.Want_replicas = 2
1201 arv.ApiToken = "abc123"
1202 localRoots := make(map[string]string)
1203 writableLocalRoots := make(map[string]string)
1205 ks1 := RunSomeFakeKeepServers(st, 1)
1207 for i, k := range ks1 {
1208 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1209 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1210 defer k.listener.Close()
1213 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1215 _, replicas, err := kc.PutB([]byte("foo"))
1219 c.Check(replicas, Equals, 2)
1222 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
1223 st := StubProxyHandler{make(chan string, 1)}
1225 arv, err := arvadosclient.MakeArvadosClient()
1227 kc, _ := MakeKeepClient(arv)
1229 kc.Want_replicas = 3
1230 arv.ApiToken = "abc123"
1231 localRoots := make(map[string]string)
1232 writableLocalRoots := make(map[string]string)
1234 ks1 := RunSomeFakeKeepServers(st, 1)
1236 for i, k := range ks1 {
1237 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1238 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1239 defer k.listener.Close()
1241 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1243 _, replicas, err := kc.PutB([]byte("foo"))
1246 c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1247 c.Check(replicas, Equals, 2)
1250 func (s *StandaloneSuite) TestMakeLocator(c *C) {
1251 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
1253 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1254 c.Check(l.Size, Equals, 3)
1255 c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
1258 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
1259 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
1261 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1262 c.Check(l.Size, Equals, -1)
1263 c.Check(l.Hints, DeepEquals, []string{})
1266 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
1267 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
1269 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1270 c.Check(l.Size, Equals, -1)
1271 c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
1274 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
1275 str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
1276 l, err := MakeLocator(str)
1278 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1279 c.Check(l.Size, Equals, 3)
1280 c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
1281 c.Check(l.String(), Equals, str)
1284 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
1285 _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
1286 c.Check(err, Equals, InvalidLocatorError)
1289 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
1290 hash := Md5String("foo")
1292 st := &StubPutHandler{
1295 expectAPIToken: "abc123",
1297 expectStorageClass: "default",
1298 returnStorageClasses: "",
1299 handled: make(chan string, 5),
1302 arv, _ := arvadosclient.MakeArvadosClient()
1303 kc, _ := MakeKeepClient(arv)
1305 kc.Want_replicas = 2
1306 arv.ApiToken = "abc123"
1307 localRoots := make(map[string]string)
1308 writableLocalRoots := make(map[string]string)
1310 ks := RunSomeFakeKeepServers(st, 5)
1312 for i, k := range ks {
1313 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1315 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1317 defer k.listener.Close()
1320 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1322 _, replicas, err := kc.PutB([]byte("foo"))
1324 c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1325 c.Check(replicas, Equals, 1)
1327 c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1330 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1331 hash := Md5String("foo")
1333 st := &StubPutHandler{
1336 expectAPIToken: "abc123",
1338 expectStorageClass: "",
1339 returnStorageClasses: "",
1340 handled: make(chan string, 5),
1343 arv, _ := arvadosclient.MakeArvadosClient()
1344 kc, _ := MakeKeepClient(arv)
1346 kc.Want_replicas = 2
1347 arv.ApiToken = "abc123"
1348 localRoots := make(map[string]string)
1349 writableLocalRoots := make(map[string]string)
1351 ks := RunSomeFakeKeepServers(st, 5)
1353 for i, k := range ks {
1354 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1355 defer k.listener.Close()
1358 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1360 _, replicas, err := kc.PutB([]byte("foo"))
1362 c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1363 c.Check(replicas, Equals, 0)
1366 type StubGetIndexHandler struct {
1369 expectAPIToken string
1374 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1375 h.c.Check(req.URL.Path, Equals, h.expectPath)
1376 h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1377 resp.WriteHeader(h.httpStatus)
1378 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1382 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1383 hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1385 st := StubGetIndexHandler{
1390 []byte(hash + " 1443559274\n\n")}
1392 ks := RunFakeKeepServer(st)
1393 defer ks.listener.Close()
1395 arv, err := arvadosclient.MakeArvadosClient()
1396 c.Assert(err, IsNil)
1397 kc, err := MakeKeepClient(arv)
1398 c.Assert(err, IsNil)
1399 arv.ApiToken = "abc123"
1400 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1402 r, err := kc.GetIndex("x", "")
1405 content, err2 := ioutil.ReadAll(r)
1406 c.Check(err2, IsNil)
1407 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1410 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1411 hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1413 st := StubGetIndexHandler{
1415 "/index/" + hash[0:3],
1418 []byte(hash + " 1443559274\n\n")}
1420 ks := RunFakeKeepServer(st)
1421 defer ks.listener.Close()
1423 arv, err := arvadosclient.MakeArvadosClient()
1425 kc, _ := MakeKeepClient(arv)
1426 arv.ApiToken = "abc123"
1427 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1429 r, err := kc.GetIndex("x", hash[0:3])
1430 c.Assert(err, IsNil)
1432 content, err2 := ioutil.ReadAll(r)
1433 c.Check(err2, IsNil)
1434 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1437 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1438 hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1440 st := StubGetIndexHandler{
1442 "/index/" + hash[0:3],
1447 ks := RunFakeKeepServer(st)
1448 defer ks.listener.Close()
1450 arv, err := arvadosclient.MakeArvadosClient()
1452 kc, _ := MakeKeepClient(arv)
1453 arv.ApiToken = "abc123"
1454 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1456 _, err = kc.GetIndex("x", hash[0:3])
1457 c.Check(err, Equals, ErrIncompleteIndex)
1460 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1461 hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1463 st := StubGetIndexHandler{
1465 "/index/" + hash[0:3],
1470 ks := RunFakeKeepServer(st)
1471 defer ks.listener.Close()
1473 arv, err := arvadosclient.MakeArvadosClient()
1475 kc, _ := MakeKeepClient(arv)
1476 arv.ApiToken = "abc123"
1477 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1479 _, err = kc.GetIndex("y", hash[0:3])
1480 c.Check(err, Equals, ErrNoSuchKeepServer)
1483 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1484 st := StubGetIndexHandler{
1491 ks := RunFakeKeepServer(st)
1492 defer ks.listener.Close()
1494 arv, err := arvadosclient.MakeArvadosClient()
1496 kc, _ := MakeKeepClient(arv)
1497 arv.ApiToken = "abc123"
1498 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1500 r, err := kc.GetIndex("x", "abcd")
1503 content, err2 := ioutil.ReadAll(r)
1504 c.Check(err2, IsNil)
1505 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1508 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1509 defer func(origDefault, origMinimum time.Duration) {
1510 DefaultRetryDelay = origDefault
1511 MinimumRetryDelay = origMinimum
1512 }(DefaultRetryDelay, MinimumRetryDelay)
1513 DefaultRetryDelay = time.Second / 8
1514 MinimumRetryDelay = time.Millisecond
1516 for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
1517 c.Logf("=== initial delay %v", delay)
1519 st := &FailThenSucceedHandler{
1520 morefails: 5, // handler will fail 6x in total, 3 for each server
1521 handled: make(chan string, 10),
1522 successhandler: &StubPutHandler{
1524 expectPath: Md5String("foo"),
1525 expectAPIToken: "abc123",
1527 expectStorageClass: "default",
1528 returnStorageClasses: "",
1529 handled: make(chan string, 5),
1533 arv, _ := arvadosclient.MakeArvadosClient()
1534 kc, _ := MakeKeepClient(arv)
1536 kc.RetryDelay = delay
1537 kc.DiskCacheSize = DiskCacheDisabled
1538 kc.Want_replicas = 2
1540 arv.ApiToken = "abc123"
1541 localRoots := make(map[string]string)
1542 writableLocalRoots := make(map[string]string)
1544 ks := RunSomeFakeKeepServers(st, 2)
1546 for i, k := range ks {
1547 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1548 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1549 defer k.listener.Close()
1552 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1555 hash, replicas, err := kc.PutB([]byte("foo"))
1558 c.Check(hash, Equals, "")
1559 c.Check(replicas, Equals, 2)
1560 elapsed := time.Since(t0)
1562 nonsleeptime := time.Second / 10
1563 expect := kc.RetryDelay
1565 expect = DefaultRetryDelay
1567 min := MinimumRetryDelay * 3
1568 max := expect + expect*2 + expect*2*2
1570 c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
1571 c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
1575 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1576 arv, err := arvadosclient.MakeArvadosClient()
1577 c.Assert(err, IsNil)
1579 // Add an additional "testblobstore" keepservice
1580 blobKeepService := make(arvadosclient.Dict)
1581 err = arv.Create("keep_services",
1582 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1583 "service_host": "localhost",
1584 "service_port": "21321",
1585 "service_type": "testblobstore"}},
1587 c.Assert(err, IsNil)
1588 defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1589 RefreshServiceDiscovery()
1591 // Make a keepclient and ensure that the testblobstore is included
1592 kc, err := MakeKeepClient(arv)
1593 c.Assert(err, IsNil)
1595 // verify kc.LocalRoots
1596 c.Check(len(kc.LocalRoots()), Equals, 3)
1597 for _, root := range kc.LocalRoots() {
1598 c.Check(root, Matches, "http://localhost:\\d+")
1600 c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1602 // verify kc.GatewayRoots
1603 c.Check(len(kc.GatewayRoots()), Equals, 3)
1604 for _, root := range kc.GatewayRoots() {
1605 c.Check(root, Matches, "http://localhost:\\d+")
1607 c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1609 // verify kc.WritableLocalRoots
1610 c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1611 for _, root := range kc.WritableLocalRoots() {
1612 c.Check(root, Matches, "http://localhost:\\d+")
1614 c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1616 c.Assert(kc.replicasPerService, Equals, 0)
1617 c.Assert(kc.foundNonDiskSvc, Equals, true)
1618 c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
1621 func (s *StandaloneSuite) TestDelayCalculator(c *C) {
1622 defer func(origDefault, origMinimum time.Duration) {
1623 DefaultRetryDelay = origDefault
1624 MinimumRetryDelay = origMinimum
1625 }(DefaultRetryDelay, MinimumRetryDelay)
1627 checkInterval := func(d, min, max time.Duration) {
1628 c.Check(d >= min, Equals, true)
1629 c.Check(d <= max, Equals, true)
1632 MinimumRetryDelay = time.Second / 2
1633 DefaultRetryDelay = time.Second
1634 dc := delayCalculator{InitialMaxDelay: 0}
1635 checkInterval(dc.Next(), time.Second/2, time.Second)
1636 checkInterval(dc.Next(), time.Second/2, time.Second*2)
1637 checkInterval(dc.Next(), time.Second/2, time.Second*4)
1638 checkInterval(dc.Next(), time.Second/2, time.Second*8)
1639 checkInterval(dc.Next(), time.Second/2, time.Second*10)
1640 checkInterval(dc.Next(), time.Second/2, time.Second*10)
1642 // Enforce non-zero InitialMaxDelay
1643 dc = delayCalculator{InitialMaxDelay: time.Second}
1644 checkInterval(dc.Next(), time.Second/2, time.Second*2)
1645 checkInterval(dc.Next(), time.Second/2, time.Second*4)
1646 checkInterval(dc.Next(), time.Second/2, time.Second*8)
1647 checkInterval(dc.Next(), time.Second/2, time.Second*16)
1648 checkInterval(dc.Next(), time.Second/2, time.Second*20)
1649 checkInterval(dc.Next(), time.Second/2, time.Second*20)
1651 // Enforce MinimumRetryDelay
1652 dc = delayCalculator{InitialMaxDelay: time.Millisecond}
1653 checkInterval(dc.Next(), time.Second/2, time.Second/2)
1654 checkInterval(dc.Next(), time.Second/2, time.Second)
1655 checkInterval(dc.Next(), time.Second/2, time.Second*2)
1656 checkInterval(dc.Next(), time.Second/2, time.Second*4)
1657 checkInterval(dc.Next(), time.Second/2, time.Second*8)
1658 checkInterval(dc.Next(), time.Second/2, time.Second*10)
1659 checkInterval(dc.Next(), time.Second/2, time.Second*10)
1661 // If InitialMaxDelay is less than MinimumRetryDelay/10, then
1662 // delay is always MinimumRetryDelay.
1663 dc = delayCalculator{InitialMaxDelay: time.Millisecond}
1664 for i := 0; i < 20; i++ {
1665 c.Check(dc.Next(), Equals, time.Second/2)