X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f5162efc3578d0420e869e3fcbda46454a855909..33bb622ec2846f9e3c788655f42d7a7f25a4651c:/sdk/go/keepclient/keepclient_test.go diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go index 62268fa463..4cfe1a0896 100644 --- a/sdk/go/keepclient/keepclient_test.go +++ b/sdk/go/keepclient/keepclient_test.go @@ -17,6 +17,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "testing" "time" @@ -24,11 +25,10 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/arvadostest" . "gopkg.in/check.v1" - check "gopkg.in/check.v1" ) -// Gocheck boilerplate func Test(t *testing.T) { + DefaultRetryDelay = 50 * time.Millisecond TestingT(t) } @@ -42,8 +42,16 @@ type ServerRequiredSuite struct{} // Standalone tests type StandaloneSuite struct{} +var origHOME = os.Getenv("HOME") + func (s *StandaloneSuite) SetUpTest(c *C) { RefreshServiceDiscovery() + // Prevent cache state from leaking between test cases + os.Setenv("HOME", c.MkDir()) +} + +func (s *StandaloneSuite) TearDownTest(c *C) { + os.Setenv("HOME", origHOME) } func pythonDir() string { @@ -52,35 +60,49 @@ func pythonDir() string { } func (s *ServerRequiredSuite) SetUpSuite(c *C) { - arvadostest.StartAPI() arvadostest.StartKeep(2, false) } func (s *ServerRequiredSuite) TearDownSuite(c *C) { arvadostest.StopKeep(2) - arvadostest.StopAPI() + os.Setenv("HOME", origHOME) } func (s *ServerRequiredSuite) SetUpTest(c *C) { RefreshServiceDiscovery() + // Prevent cache state from leaking between test cases + os.Setenv("HOME", c.MkDir()) } func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) { arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) kc, err := MakeKeepClient(arv) - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) c.Check(len(kc.LocalRoots()), Equals, 2) for _, root := range kc.LocalRoots() { c.Check(root, Matches, "http://localhost:\\d+") } } +func (s *ServerRequiredSuite) TestDefaultStorageClasses(c *C) { + arv, err := arvadosclient.MakeArvadosClient() + c.Assert(err, IsNil) + + cc, err := arv.ClusterConfig("StorageClasses") + c.Assert(err, IsNil) + c.Assert(cc, NotNil) + c.Assert(cc.(map[string]interface{})["default"], NotNil) + + kc := New(arv) + c.Assert(kc.DefaultStorageClasses, DeepEquals, []string{"default"}) +} + func (s *ServerRequiredSuite) TestDefaultReplications(c *C) { arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) kc, err := MakeKeepClient(arv) c.Check(err, IsNil) @@ -119,7 +141,7 @@ func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass) } body, err := ioutil.ReadAll(req.Body) - sph.c.Check(err, Equals, nil) + sph.c.Check(err, IsNil) sph.c.Check(body, DeepEquals, []byte(sph.expectBody)) resp.Header().Set("X-Keep-Replicas-Stored", "1") if sph.returnStorageClasses != "" { @@ -135,7 +157,7 @@ func RunFakeKeepServer(st http.Handler) (ks KeepServer) { // bind to 0.0.0.0 or [::] which is not a valid address for Dial() ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0}) if err != nil { - panic(fmt.Sprintf("Could not listen on any port")) + panic("Could not listen on any port") } ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String()) go http.Serve(ks.listener, st) @@ -242,28 +264,107 @@ func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) { } } -func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) { +func (s *StandaloneSuite) TestPutWithoutStorageClassesClusterSupport(c *C) { nServers := 5 for _, trial := range []struct { replicas int clientClasses []string - putClasses []string // putClasses takes precedence over clientClasses + putClasses []string minRequests int maxRequests int success bool }{ + // Talking to an older cluster (no default storage classes exported + // config) and no other additional storage classes requirements. + {1, nil, nil, 1, 1, true}, + {2, nil, nil, 2, 2, true}, + {3, nil, nil, 3, 3, true}, + {nServers*2 + 1, nil, nil, nServers, nServers, false}, + {1, []string{"class1"}, nil, 1, 1, true}, - {2, []string{"class1"}, nil, 1, 2, true}, - {3, []string{"class1"}, nil, 2, 3, true}, + {2, []string{"class1"}, nil, 2, 2, true}, + {3, []string{"class1"}, nil, 3, 3, true}, {1, []string{"class1", "class2"}, nil, 1, 1, true}, - {3, nil, []string{"class1"}, 2, 3, true}, - {1, nil, []string{"class1", "class2"}, 1, 1, true}, - {1, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true}, - {1, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false}, {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false}, - {1, []string{"class404"}, nil, nServers, nServers, false}, - {1, []string{"class1", "class404"}, nil, nServers, nServers, false}, - {1, nil, []string{"class1", "class404"}, nServers, nServers, false}, + + {1, nil, []string{"class1"}, 1, 1, true}, + {2, nil, []string{"class1"}, 2, 2, true}, + {3, nil, []string{"class1"}, 3, 3, true}, + {1, nil, []string{"class1", "class2"}, 1, 1, true}, + {nServers*2 + 1, nil, []string{"class1"}, nServers, nServers, false}, + } { + c.Logf("%+v", trial) + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "*", + returnStorageClasses: "", // Simulate old cluster without SC keep support + handled: make(chan string, 100), + } + ks := RunSomeFakeKeepServers(st, nServers) + arv, _ := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(arv) + kc.Want_replicas = trial.replicas + kc.StorageClasses = trial.clientClasses + kc.DefaultStorageClasses = nil // Simulate an old cluster without SC defaults + arv.ApiToken = "abc123" + localRoots := make(map[string]string) + writableLocalRoots := make(map[string]string) + for i, k := range ks { + localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + defer k.listener.Close() + } + kc.SetServiceRoots(localRoots, writableLocalRoots, nil) + + _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Data: []byte("foo"), + StorageClasses: trial.putClasses, + }) + if trial.success { + c.Check(err, IsNil) + } else { + c.Check(err, NotNil) + } + c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests)) + c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests)) + if trial.clientClasses == nil && trial.putClasses == nil { + c.Check(st.requests[0].Header.Get("X-Keep-Storage-Classes"), Equals, "") + } + } +} + +func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) { + nServers := 5 + for _, trial := range []struct { + replicas int + defaultClasses []string + clientClasses []string // clientClasses takes precedence over defaultClasses + putClasses []string // putClasses takes precedence over clientClasses + minRequests int + maxRequests int + success bool + }{ + {1, []string{"class1"}, nil, nil, 1, 1, true}, + {2, []string{"class1"}, nil, nil, 1, 2, true}, + {3, []string{"class1"}, nil, nil, 2, 3, true}, + {1, []string{"class1", "class2"}, nil, nil, 1, 1, true}, + + // defaultClasses doesn't matter when any of the others is specified. + {1, []string{"class1"}, []string{"class1"}, nil, 1, 1, true}, + {2, []string{"class1"}, []string{"class1"}, nil, 1, 2, true}, + {3, []string{"class1"}, []string{"class1"}, nil, 2, 3, true}, + {1, []string{"class1"}, []string{"class1", "class2"}, nil, 1, 1, true}, + {3, []string{"class1"}, nil, []string{"class1"}, 2, 3, true}, + {1, []string{"class1"}, nil, []string{"class1", "class2"}, 1, 1, true}, + {1, []string{"class1"}, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true}, + {1, []string{"class1"}, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false}, + {nServers*2 + 1, []string{}, []string{"class1"}, nil, nServers, nServers, false}, + {1, []string{"class1"}, []string{"class404"}, nil, nServers, nServers, false}, + {1, []string{"class1"}, []string{"class1", "class404"}, nil, nServers, nServers, false}, + {1, []string{"class1"}, nil, []string{"class1", "class404"}, nServers, nServers, false}, } { c.Logf("%+v", trial) st := &StubPutHandler{ @@ -280,6 +381,7 @@ func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) { kc, _ := MakeKeepClient(arv) kc.Want_replicas = trial.replicas kc.StorageClasses = trial.clientClasses + kc.DefaultStorageClasses = trial.defaultClasses arv.ApiToken = "abc123" localRoots := make(map[string]string) writableLocalRoots := make(map[string]string) @@ -295,17 +397,17 @@ func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) { StorageClasses: trial.putClasses, }) if trial.success { - c.Check(err, check.IsNil) + c.Check(err, IsNil) } else { - c.Check(err, check.NotNil) + c.Check(err, NotNil) } - c.Check(len(st.handled) >= trial.minRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests)) - c.Check(len(st.handled) <= trial.maxRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests)) - if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, check.Equals, true) { + c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests)) + c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests)) + if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, Equals, true) { // Max concurrency should be 1. First request // should have succeeded for class1. Second // request should only ask for class404. - c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), check.Equals, "class404") + c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), Equals, "class404") } } } @@ -320,17 +422,17 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { } type FailThenSucceedHandler struct { + morefails int // fail 1 + this many times before succeeding handled chan string - count int + count atomic.Int64 successhandler http.Handler reqIDs []string } func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id")) - if fh.count == 0 { + if int(fh.count.Add(1)) <= fh.morefails+1 { resp.WriteHeader(500) - fh.count++ fh.handled <- fmt.Sprintf("http://%s", req.Host) } else { fh.successhandler.ServeHTTP(resp, req) @@ -392,7 +494,7 @@ func (s *StandaloneSuite) TestPutB(c *C) { expectPath: hash, expectAPIToken: "abc123", expectBody: "foo", - expectStorageClass: "", + expectStorageClass: "default", returnStorageClasses: "", handled: make(chan string, 5), } @@ -436,7 +538,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) { expectPath: hash, expectAPIToken: "abc123", expectBody: "foo", - expectStorageClass: "", + expectStorageClass: "default", returnStorageClasses: "", handled: make(chan string, 5), } @@ -459,14 +561,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) { kc.SetServiceRoots(localRoots, writableLocalRoots, nil) - reader, writer := io.Pipe() - - go func() { - writer.Write([]byte("foo")) - writer.Close() - }() - - kc.PutHR(hash, reader, 3) + kc.PutHR(hash, bytes.NewBuffer([]byte("foo")), 3) shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots() @@ -487,7 +582,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) { expectPath: hash, expectAPIToken: "abc123", expectBody: "foo", - expectStorageClass: "", + expectStorageClass: "default", returnStorageClasses: "", handled: make(chan string, 4), } @@ -528,7 +623,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) { <-fh.handled - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(phash, Equals, "") c.Check(replicas, Equals, 2) @@ -549,7 +644,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { expectPath: hash, expectAPIToken: "abc123", expectBody: "foo", - expectStorageClass: "", + expectStorageClass: "default", returnStorageClasses: "", handled: make(chan string, 1), } @@ -607,7 +702,7 @@ func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) } func (s *StandaloneSuite) TestGet(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := StubGetHandler{ c, @@ -625,19 +720,18 @@ func (s *StandaloneSuite) TestGet(c *C) { arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) - r, n, url2, err := kc.Get(hash) - defer r.Close() - c.Check(err, Equals, nil) + r, n, _, err := kc.Get(hash) + c.Assert(err, IsNil) c.Check(n, Equals, int64(3)) - c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash)) content, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) + c.Check(err2, IsNil) c.Check(content, DeepEquals, []byte("foo")) + c.Check(r.Close(), IsNil) } func (s *StandaloneSuite) TestGet404(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := Error404Handler{make(chan string, 1)} @@ -650,11 +744,10 @@ func (s *StandaloneSuite) TestGet404(c *C) { arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) - r, n, url2, err := kc.Get(hash) + r, n, _, err := kc.Get(hash) c.Check(err, Equals, BlockNotFound) c.Check(n, Equals, int64(0)) - c.Check(url2, Equals, "") - c.Check(r, Equals, nil) + c.Check(r, IsNil) } func (s *StandaloneSuite) TestGetEmptyBlock(c *C) { @@ -669,18 +762,18 @@ func (s *StandaloneSuite) TestGetEmptyBlock(c *C) { arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) - r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0") + r, n, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0") c.Check(err, IsNil) c.Check(n, Equals, int64(0)) - c.Check(url2, Equals, "") c.Assert(r, NotNil) buf, err := ioutil.ReadAll(r) c.Check(err, IsNil) c.Check(buf, DeepEquals, []byte{}) + c.Check(r.Close(), IsNil) } func (s *StandaloneSuite) TestGetFail(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := FailHandler{make(chan string, 1)} @@ -694,57 +787,84 @@ func (s *StandaloneSuite) TestGetFail(c *C) { kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) kc.Retries = 0 - r, n, url2, err := kc.Get(hash) + r, n, _, err := kc.Get(hash) errNotFound, _ := err.(*ErrNotFound) - c.Check(errNotFound, NotNil) - c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true) - c.Check(errNotFound.Temporary(), Equals, true) + if c.Check(errNotFound, NotNil) { + c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true) + c.Check(errNotFound.Temporary(), Equals, true) + } c.Check(n, Equals, int64(0)) - c.Check(url2, Equals, "") - c.Check(r, Equals, nil) + c.Check(r, IsNil) } func (s *StandaloneSuite) TestGetFailRetry(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - - st := &FailThenSucceedHandler{ - handled: make(chan string, 1), - successhandler: StubGetHandler{ - c, - hash, - "abc123", - http.StatusOK, - []byte("foo")}} - - ks := RunFakeKeepServer(st) - defer ks.listener.Close() - - arv, err := arvadosclient.MakeArvadosClient() - c.Check(err, IsNil) - kc, _ := MakeKeepClient(arv) - arv.ApiToken = "abc123" - kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) - - r, n, url2, err := kc.Get(hash) - defer r.Close() - c.Check(err, Equals, nil) - c.Check(n, Equals, int64(3)) - c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash)) - - content, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) - c.Check(content, DeepEquals, []byte("foo")) - - c.Logf("%q", st.reqIDs) - c.Assert(len(st.reqIDs) > 1, Equals, true) - for _, reqid := range st.reqIDs { - c.Check(reqid, Not(Equals), "") - c.Check(reqid, Equals, st.reqIDs[0]) + defer func(origDefault, origMinimum time.Duration) { + DefaultRetryDelay = origDefault + MinimumRetryDelay = origMinimum + }(DefaultRetryDelay, MinimumRetryDelay) + DefaultRetryDelay = time.Second / 8 + MinimumRetryDelay = time.Millisecond + + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) + + for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} { + c.Logf("=== initial delay %v", delay) + + st := &FailThenSucceedHandler{ + morefails: 2, + handled: make(chan string, 4), + successhandler: StubGetHandler{ + c, + hash, + "abc123", + http.StatusOK, + []byte("foo")}} + + ks := RunFakeKeepServer(st) + defer ks.listener.Close() + + arv, err := arvadosclient.MakeArvadosClient() + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) + arv.ApiToken = "abc123" + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) + kc.Retries = 3 + kc.RetryDelay = delay + kc.DiskCacheSize = DiskCacheDisabled + + t0 := time.Now() + r, n, _, err := kc.Get(hash) + c.Assert(err, IsNil) + c.Check(n, Equals, int64(3)) + elapsed := time.Since(t0) + + nonsleeptime := time.Second / 10 + expect := kc.RetryDelay + if expect == 0 { + expect = DefaultRetryDelay + } + min := MinimumRetryDelay * 3 + max := expect + expect*2 + expect*2*2 + nonsleeptime + c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min)) + c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max)) + + content, err := ioutil.ReadAll(r) + c.Check(err, IsNil) + c.Check(content, DeepEquals, []byte("foo")) + c.Check(r.Close(), IsNil) + + c.Logf("%q", st.reqIDs) + if c.Check(st.reqIDs, Not(HasLen), 0) { + for _, reqid := range st.reqIDs { + c.Check(reqid, Not(Equals), "") + c.Check(reqid, Equals, st.reqIDs[0]) + } + } } } func (s *StandaloneSuite) TestGetNetError(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) arv, err := arvadosclient.MakeArvadosClient() c.Check(err, IsNil) @@ -752,19 +872,19 @@ func (s *StandaloneSuite) TestGetNetError(c *C) { arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil) - r, n, url2, err := kc.Get(hash) + r, n, _, err := kc.Get(hash) errNotFound, _ := err.(*ErrNotFound) - c.Check(errNotFound, NotNil) - c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true) - c.Check(errNotFound.Temporary(), Equals, true) + if c.Check(errNotFound, NotNil) { + c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true) + c.Check(errNotFound.Temporary(), Equals, true) + } c.Check(n, Equals, int64(0)) - c.Check(url2, Equals, "") - c.Check(r, Equals, nil) + c.Check(r, IsNil) } func (s *StandaloneSuite) TestGetWithServiceHint(c *C) { uuid := "zzzzz-bi6l4-123451234512345" - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) // This one shouldn't be used: ks0 := RunFakeKeepServer(StubGetHandler{ @@ -792,22 +912,21 @@ func (s *StandaloneSuite) TestGetWithServiceHint(c *C) { nil, map[string]string{uuid: ks.url}) - r, n, uri, err := kc.Get(hash + "+K@" + uuid) - defer r.Close() - c.Check(err, Equals, nil) + r, n, _, err := kc.Get(hash + "+K@" + uuid) + c.Assert(err, IsNil) c.Check(n, Equals, int64(3)) - c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid)) content, err := ioutil.ReadAll(r) - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(content, DeepEquals, []byte("foo")) + c.Check(r.Close(), IsNil) } // Use a service hint to fetch from a local disk service, overriding // rendezvous probe order. func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) { uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz" - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) // This one shouldn't be used, although it appears first in // rendezvous probe order: @@ -815,8 +934,8 @@ func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) { c, "error if used", "abc123", - http.StatusOK, - []byte("foo")}) + http.StatusBadGateway, + nil}) defer ks0.listener.Close() // This one should be used: ks := RunFakeKeepServer(StubGetHandler{ @@ -845,20 +964,19 @@ func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) { uuid: ks.url}, ) - r, n, uri, err := kc.Get(hash + "+K@" + uuid) - defer r.Close() - c.Check(err, Equals, nil) + r, n, _, err := kc.Get(hash + "+K@" + uuid) + c.Assert(err, IsNil) c.Check(n, Equals, int64(3)) - c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid)) content, err := ioutil.ReadAll(r) - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(content, DeepEquals, []byte("foo")) + c.Check(r.Close(), IsNil) } func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) { uuid := "zzzzz-bi6l4-123451234512345" - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) ksLocal := RunFakeKeepServer(StubGetHandler{ c, @@ -884,15 +1002,14 @@ func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) { nil, map[string]string{uuid: ksGateway.url}) - r, n, uri, err := kc.Get(hash + "+K@" + uuid) - c.Assert(err, Equals, nil) - defer r.Close() + r, n, _, err := kc.Get(hash + "+K@" + uuid) + c.Assert(err, IsNil) c.Check(n, Equals, int64(3)) - c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid)) content, err := ioutil.ReadAll(r) - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(content, DeepEquals, []byte("foo")) + c.Check(r.Close(), IsNil) } type BarHandler struct { @@ -905,8 +1022,8 @@ func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { } func (s *StandaloneSuite) TestChecksum(c *C) { - foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar"))) + foohash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) + barhash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar"))) st := BarHandler{make(chan string, 1)} @@ -920,25 +1037,36 @@ func (s *StandaloneSuite) TestChecksum(c *C) { kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, n, _, err := kc.Get(barhash) - c.Check(err, IsNil) - _, err = ioutil.ReadAll(r) - c.Check(n, Equals, int64(3)) - c.Check(err, Equals, nil) + if c.Check(err, IsNil) { + _, err = ioutil.ReadAll(r) + c.Check(n, Equals, int64(3)) + c.Check(err, IsNil) + } - <-st.handled + select { + case <-st.handled: + case <-time.After(time.Second): + c.Fatal("timed out") + } r, n, _, err = kc.Get(foohash) - c.Check(err, IsNil) - _, err = ioutil.ReadAll(r) - c.Check(n, Equals, int64(3)) + if err == nil { + buf, readerr := ioutil.ReadAll(r) + c.Logf("%q", buf) + err = readerr + } c.Check(err, Equals, BadChecksum) - <-st.handled + select { + case <-st.handled: + case <-time.After(time.Second): + c.Fatal("timed out") + } } func (s *StandaloneSuite) TestGetWithFailures(c *C) { content := []byte("waz") - hash := fmt.Sprintf("%x", md5.Sum(content)) + hash := fmt.Sprintf("%x+3", md5.Sum(content)) fh := Error404Handler{ make(chan string, 4)} @@ -982,16 +1110,20 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) { // an example that passes this Assert.) c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url) - r, n, url2, err := kc.Get(hash) + r, n, _, err := kc.Get(hash) - <-fh.handled - c.Check(err, Equals, nil) + select { + case <-fh.handled: + case <-time.After(time.Second): + c.Fatal("timed out") + } + c.Assert(err, IsNil) c.Check(n, Equals, int64(3)) - c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash)) readContent, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) + c.Check(err2, IsNil) c.Check(readContent, DeepEquals, content) + c.Check(r.Close(), IsNil) } func (s *ServerRequiredSuite) TestPutGetHead(c *C) { @@ -1000,9 +1132,9 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) { arv, err := arvadosclient.MakeArvadosClient() c.Check(err, IsNil) kc, err := MakeKeepClient(arv) - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) - hash := fmt.Sprintf("%x", md5.Sum(content)) + hash := fmt.Sprintf("%x+%d", md5.Sum(content), len(content)) { n, _, err := kc.Ask(hash) @@ -1011,29 +1143,32 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) { } { hash2, replicas, err := kc.PutB(content) - c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content))) + c.Check(err, IsNil) + c.Check(hash2, Matches, `\Q`+hash+`\E\b.*`) c.Check(replicas, Equals, 2) - c.Check(err, Equals, nil) } { - r, n, url2, err := kc.Get(hash) - c.Check(err, Equals, nil) + r, n, _, err := kc.Get(hash) + c.Check(err, IsNil) c.Check(n, Equals, int64(len(content))) - c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash)) - - readContent, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) - c.Check(readContent, DeepEquals, content) + if c.Check(r, NotNil) { + readContent, err := ioutil.ReadAll(r) + c.Check(err, IsNil) + if c.Check(len(readContent), Equals, len(content)) { + c.Check(readContent, DeepEquals, content) + } + c.Check(r.Close(), IsNil) + } } { n, url2, err := kc.Ask(hash) - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(n, Equals, int64(len(content))) - c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash)) + c.Check(url2, Matches, "http://localhost:\\d+/\\Q"+hash+"\\E") } { loc, err := kc.LocalLocator(hash) - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Assert(len(loc) >= 32, Equals, true) c.Check(loc[:32], Equals, hash[:32]) } @@ -1080,7 +1215,7 @@ func (s *StandaloneSuite) TestPutProxy(c *C) { _, replicas, err := kc.PutB([]byte("foo")) <-st.handled - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(replicas, Equals, 2) } @@ -1114,7 +1249,7 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) { func (s *StandaloneSuite) TestMakeLocator(c *C) { l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678") - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce") c.Check(l.Size, Equals, 3) c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"}) @@ -1122,7 +1257,7 @@ func (s *StandaloneSuite) TestMakeLocator(c *C) { func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) { l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce") - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce") c.Check(l.Size, Equals, -1) c.Check(l.Hints, DeepEquals, []string{}) @@ -1130,7 +1265,7 @@ func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) { func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) { l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678") - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce") c.Check(l.Size, Equals, -1) c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"}) @@ -1139,7 +1274,7 @@ func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) { func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) { str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar" l, err := MakeLocator(str) - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce") c.Check(l.Size, Equals, 3) c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"}) @@ -1159,7 +1294,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C expectPath: hash, expectAPIToken: "abc123", expectBody: "foo", - expectStorageClass: "", + expectStorageClass: "default", returnStorageClasses: "", handled: make(chan string, 5), } @@ -1245,14 +1380,14 @@ func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reque } func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := StubGetIndexHandler{ c, "/index", "abc123", http.StatusOK, - []byte(hash + "+3 1443559274\n\n")} + []byte(hash + " 1443559274\n\n")} ks := RunFakeKeepServer(st) defer ks.listener.Close() @@ -1268,19 +1403,19 @@ func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) { c.Check(err, IsNil) content, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) + c.Check(err2, IsNil) c.Check(content, DeepEquals, st.body[0:len(st.body)-1]) } func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := StubGetIndexHandler{ c, "/index/" + hash[0:3], "abc123", http.StatusOK, - []byte(hash + "+3 1443559274\n\n")} + []byte(hash + " 1443559274\n\n")} ks := RunFakeKeepServer(st) defer ks.listener.Close() @@ -1292,15 +1427,15 @@ func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) { kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, err := kc.GetIndex("x", hash[0:3]) - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) content, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) + c.Check(err2, IsNil) c.Check(content, DeepEquals, st.body[0:len(st.body)-1]) } func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := StubGetIndexHandler{ c, @@ -1323,7 +1458,7 @@ func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) { } func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := StubGetIndexHandler{ c, @@ -1363,55 +1498,83 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) { kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, err := kc.GetIndex("x", "abcd") - c.Check(err, Equals, nil) + c.Check(err, IsNil) content, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) + c.Check(err2, IsNil) c.Check(content, DeepEquals, st.body[0:len(st.body)-1]) } func (s *StandaloneSuite) TestPutBRetry(c *C) { - st := &FailThenSucceedHandler{ - handled: make(chan string, 1), - successhandler: &StubPutHandler{ - c: c, - expectPath: Md5String("foo"), - expectAPIToken: "abc123", - expectBody: "foo", - expectStorageClass: "", - returnStorageClasses: "", - handled: make(chan string, 5), - }, - } + defer func(origDefault, origMinimum time.Duration) { + DefaultRetryDelay = origDefault + MinimumRetryDelay = origMinimum + }(DefaultRetryDelay, MinimumRetryDelay) + DefaultRetryDelay = time.Second / 8 + MinimumRetryDelay = time.Millisecond + + for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} { + c.Logf("=== initial delay %v", delay) + + st := &FailThenSucceedHandler{ + morefails: 5, // handler will fail 6x in total, 3 for each server + handled: make(chan string, 10), + successhandler: &StubPutHandler{ + c: c, + expectPath: Md5String("foo"), + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + }, + } - arv, _ := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(arv) + arv, _ := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(arv) + kc.Retries = 3 + kc.RetryDelay = delay + kc.DiskCacheSize = DiskCacheDisabled + kc.Want_replicas = 2 - kc.Want_replicas = 2 - arv.ApiToken = "abc123" - localRoots := make(map[string]string) - writableLocalRoots := make(map[string]string) + arv.ApiToken = "abc123" + localRoots := make(map[string]string) + writableLocalRoots := make(map[string]string) - ks := RunSomeFakeKeepServers(st, 2) + ks := RunSomeFakeKeepServers(st, 2) - for i, k := range ks { - localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url - writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url - defer k.listener.Close() - } + for i, k := range ks { + localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + defer k.listener.Close() + } - kc.SetServiceRoots(localRoots, writableLocalRoots, nil) + kc.SetServiceRoots(localRoots, writableLocalRoots, nil) - hash, replicas, err := kc.PutB([]byte("foo")) + t0 := time.Now() + hash, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, Equals, nil) - c.Check(hash, Equals, "") - c.Check(replicas, Equals, 2) + c.Check(err, IsNil) + c.Check(hash, Equals, "") + c.Check(replicas, Equals, 2) + elapsed := time.Since(t0) + + nonsleeptime := time.Second / 10 + expect := kc.RetryDelay + if expect == 0 { + expect = DefaultRetryDelay + } + min := MinimumRetryDelay * 3 + max := expect + expect*2 + expect*2*2 + max += nonsleeptime + c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min)) + c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max)) + } } func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) { arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) // Add an additional "testblobstore" keepservice blobKeepService := make(arvadosclient.Dict) @@ -1421,13 +1584,13 @@ func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) { "service_port": "21321", "service_type": "testblobstore"}}, &blobKeepService) - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }() RefreshServiceDiscovery() // Make a keepclient and ensure that the testblobstore is included kc, err := MakeKeepClient(arv) - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) // verify kc.LocalRoots c.Check(len(kc.LocalRoots()), Equals, 3) @@ -1454,3 +1617,51 @@ func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) { c.Assert(kc.foundNonDiskSvc, Equals, true) c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second) } + +func (s *StandaloneSuite) TestDelayCalculator(c *C) { + defer func(origDefault, origMinimum time.Duration) { + DefaultRetryDelay = origDefault + MinimumRetryDelay = origMinimum + }(DefaultRetryDelay, MinimumRetryDelay) + + checkInterval := func(d, min, max time.Duration) { + c.Check(d >= min, Equals, true) + c.Check(d <= max, Equals, true) + } + + MinimumRetryDelay = time.Second / 2 + DefaultRetryDelay = time.Second + dc := delayCalculator{InitialMaxDelay: 0} + checkInterval(dc.Next(), time.Second/2, time.Second) + checkInterval(dc.Next(), time.Second/2, time.Second*2) + checkInterval(dc.Next(), time.Second/2, time.Second*4) + checkInterval(dc.Next(), time.Second/2, time.Second*8) + checkInterval(dc.Next(), time.Second/2, time.Second*10) + checkInterval(dc.Next(), time.Second/2, time.Second*10) + + // Enforce non-zero InitialMaxDelay + dc = delayCalculator{InitialMaxDelay: time.Second} + checkInterval(dc.Next(), time.Second/2, time.Second*2) + checkInterval(dc.Next(), time.Second/2, time.Second*4) + checkInterval(dc.Next(), time.Second/2, time.Second*8) + checkInterval(dc.Next(), time.Second/2, time.Second*16) + checkInterval(dc.Next(), time.Second/2, time.Second*20) + checkInterval(dc.Next(), time.Second/2, time.Second*20) + + // Enforce MinimumRetryDelay + dc = delayCalculator{InitialMaxDelay: time.Millisecond} + checkInterval(dc.Next(), time.Second/2, time.Second/2) + checkInterval(dc.Next(), time.Second/2, time.Second) + checkInterval(dc.Next(), time.Second/2, time.Second*2) + checkInterval(dc.Next(), time.Second/2, time.Second*4) + checkInterval(dc.Next(), time.Second/2, time.Second*8) + checkInterval(dc.Next(), time.Second/2, time.Second*10) + checkInterval(dc.Next(), time.Second/2, time.Second*10) + + // If InitialMaxDelay is less than MinimumRetryDelay/10, then + // delay is always MinimumRetryDelay. + dc = delayCalculator{InitialMaxDelay: time.Millisecond} + for i := 0; i < 20; i++ { + c.Check(dc.Next(), Equals, time.Second/2) + } +}