X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/865e5c1e3730117870eb1e485d553383626b882f..33bb622ec2846f9e3c788655f42d7a7f25a4651c:/sdk/go/keepclient/keepclient_test.go diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go index a1801b2145..4cfe1a0896 100644 --- a/sdk/go/keepclient/keepclient_test.go +++ b/sdk/go/keepclient/keepclient_test.go @@ -6,8 +6,8 @@ package keepclient import ( "bytes" + "context" "crypto/md5" - "errors" "fmt" "io" "io/ioutil" @@ -16,16 +16,19 @@ import ( "net/http" "os" "strings" + "sync" + "sync/atomic" "testing" "time" + "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/arvadostest" . "gopkg.in/check.v1" ) -// Gocheck boilerplate func Test(t *testing.T) { + DefaultRetryDelay = 50 * time.Millisecond TestingT(t) } @@ -39,8 +42,16 @@ type ServerRequiredSuite struct{} // Standalone tests type StandaloneSuite struct{} +var origHOME = os.Getenv("HOME") + func (s *StandaloneSuite) SetUpTest(c *C) { RefreshServiceDiscovery() + // Prevent cache state from leaking between test cases + os.Setenv("HOME", c.MkDir()) +} + +func (s *StandaloneSuite) TearDownTest(c *C) { + os.Setenv("HOME", origHOME) } func pythonDir() string { @@ -49,35 +60,49 @@ func pythonDir() string { } func (s *ServerRequiredSuite) SetUpSuite(c *C) { - arvadostest.StartAPI() arvadostest.StartKeep(2, false) } func (s *ServerRequiredSuite) TearDownSuite(c *C) { arvadostest.StopKeep(2) - arvadostest.StopAPI() + os.Setenv("HOME", origHOME) } func (s *ServerRequiredSuite) SetUpTest(c *C) { RefreshServiceDiscovery() + // Prevent cache state from leaking between test cases + os.Setenv("HOME", c.MkDir()) } func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) { arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) kc, err := MakeKeepClient(arv) - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) c.Check(len(kc.LocalRoots()), Equals, 2) for _, root := range kc.LocalRoots() { c.Check(root, Matches, "http://localhost:\\d+") } } +func (s *ServerRequiredSuite) TestDefaultStorageClasses(c *C) { + arv, err := arvadosclient.MakeArvadosClient() + c.Assert(err, IsNil) + + cc, err := arv.ClusterConfig("StorageClasses") + c.Assert(err, IsNil) + c.Assert(cc, NotNil) + c.Assert(cc.(map[string]interface{})["default"], NotNil) + + kc := New(arv) + c.Assert(kc.DefaultStorageClasses, DeepEquals, []string{"default"}) +} + func (s *ServerRequiredSuite) TestDefaultReplications(c *C) { arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) kc, err := MakeKeepClient(arv) c.Check(err, IsNil) @@ -95,21 +120,33 @@ func (s *ServerRequiredSuite) TestDefaultReplications(c *C) { } type StubPutHandler struct { - c *C - expectPath string - expectApiToken string - expectBody string - expectStorageClass string - handled chan string + c *C + expectPath string + expectAPIToken string + expectBody string + expectStorageClass string + returnStorageClasses string + handled chan string + requests []*http.Request + mtx sync.Mutex } -func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + sph.mtx.Lock() + sph.requests = append(sph.requests, req) + sph.mtx.Unlock() sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath) - sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken)) - sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass) + sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken)) + if sph.expectStorageClass != "*" { + sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass) + } body, err := ioutil.ReadAll(req.Body) - sph.c.Check(err, Equals, nil) + sph.c.Check(err, IsNil) sph.c.Check(body, DeepEquals, []byte(sph.expectBody)) + resp.Header().Set("X-Keep-Replicas-Stored", "1") + if sph.returnStorageClasses != "" { + resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses) + } resp.WriteHeader(200) sph.handled <- fmt.Sprintf("http://%s", req.Host) } @@ -120,7 +157,7 @@ func RunFakeKeepServer(st http.Handler) (ks KeepServer) { // bind to 0.0.0.0 or [::] which is not a valid address for Dial() ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0}) if err != nil { - panic(fmt.Sprintf("Could not listen on any port")) + panic("Could not listen on any port") } ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String()) go http.Serve(ks.listener, st) @@ -139,54 +176,240 @@ func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string, kc, _ := MakeKeepClient(arv) reader, writer := io.Pipe() - upload_status := make(chan uploadStatus) + uploadStatusChan := make(chan uploadStatus) - f(kc, ks.url, reader, writer, upload_status) + f(kc, ks.url, reader, writer, uploadStatusChan) } func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { log.Printf("TestUploadToStubKeepServer") - st := StubPutHandler{ - c, - "acbd18db4cc2f85cedef654fccc4a4d8", - "abc123", - "foo", - "hot", - make(chan string)} + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "default=1", + handled: make(chan string), + } UploadToStubHelper(c, st, - func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) { - kc.StorageClasses = []string{"hot"} - go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), kc.getRequestID()) + func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) { + go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID()) writer.Write([]byte("foo")) writer.Close() <-st.handled - status := <-upload_status - c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""}) + status := <-uploadStatusChan + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""}) }) } func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { - st := StubPutHandler{ - c, - "acbd18db4cc2f85cedef654fccc4a4d8", - "abc123", - "foo", - "", - make(chan string)} + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "default=1", + handled: make(chan string), + } UploadToStubHelper(c, st, - func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) { - go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, kc.getRequestID()) + func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) { + go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID()) <-st.handled - status := <-upload_status - c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""}) + status := <-uploadStatusChan + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""}) + }) +} + +func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) { + for _, trial := range []struct { + respHeader string + expectMap map[string]int + }{ + {"", nil}, + {"foo=1", map[string]int{"foo": 1}}, + {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}}, + {" =foo=1 ", nil}, + {"foo", nil}, + } { + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: trial.respHeader, + handled: make(chan string), + } + + UploadToStubHelper(c, st, + func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) { + go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID()) + + writer.Write([]byte("foo")) + writer.Close() + + <-st.handled + status := <-uploadStatusChan + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""}) + }) + } +} + +func (s *StandaloneSuite) TestPutWithoutStorageClassesClusterSupport(c *C) { + nServers := 5 + for _, trial := range []struct { + replicas int + clientClasses []string + putClasses []string + minRequests int + maxRequests int + success bool + }{ + // Talking to an older cluster (no default storage classes exported + // config) and no other additional storage classes requirements. + {1, nil, nil, 1, 1, true}, + {2, nil, nil, 2, 2, true}, + {3, nil, nil, 3, 3, true}, + {nServers*2 + 1, nil, nil, nServers, nServers, false}, + + {1, []string{"class1"}, nil, 1, 1, true}, + {2, []string{"class1"}, nil, 2, 2, true}, + {3, []string{"class1"}, nil, 3, 3, true}, + {1, []string{"class1", "class2"}, nil, 1, 1, true}, + {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false}, + + {1, nil, []string{"class1"}, 1, 1, true}, + {2, nil, []string{"class1"}, 2, 2, true}, + {3, nil, []string{"class1"}, 3, 3, true}, + {1, nil, []string{"class1", "class2"}, 1, 1, true}, + {nServers*2 + 1, nil, []string{"class1"}, nServers, nServers, false}, + } { + c.Logf("%+v", trial) + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "*", + returnStorageClasses: "", // Simulate old cluster without SC keep support + handled: make(chan string, 100), + } + ks := RunSomeFakeKeepServers(st, nServers) + arv, _ := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(arv) + kc.Want_replicas = trial.replicas + kc.StorageClasses = trial.clientClasses + kc.DefaultStorageClasses = nil // Simulate an old cluster without SC defaults + arv.ApiToken = "abc123" + localRoots := make(map[string]string) + writableLocalRoots := make(map[string]string) + for i, k := range ks { + localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + defer k.listener.Close() + } + kc.SetServiceRoots(localRoots, writableLocalRoots, nil) + + _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Data: []byte("foo"), + StorageClasses: trial.putClasses, + }) + if trial.success { + c.Check(err, IsNil) + } else { + c.Check(err, NotNil) + } + c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests)) + c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests)) + if trial.clientClasses == nil && trial.putClasses == nil { + c.Check(st.requests[0].Header.Get("X-Keep-Storage-Classes"), Equals, "") + } + } +} + +func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) { + nServers := 5 + for _, trial := range []struct { + replicas int + defaultClasses []string + clientClasses []string // clientClasses takes precedence over defaultClasses + putClasses []string // putClasses takes precedence over clientClasses + minRequests int + maxRequests int + success bool + }{ + {1, []string{"class1"}, nil, nil, 1, 1, true}, + {2, []string{"class1"}, nil, nil, 1, 2, true}, + {3, []string{"class1"}, nil, nil, 2, 3, true}, + {1, []string{"class1", "class2"}, nil, nil, 1, 1, true}, + + // defaultClasses doesn't matter when any of the others is specified. + {1, []string{"class1"}, []string{"class1"}, nil, 1, 1, true}, + {2, []string{"class1"}, []string{"class1"}, nil, 1, 2, true}, + {3, []string{"class1"}, []string{"class1"}, nil, 2, 3, true}, + {1, []string{"class1"}, []string{"class1", "class2"}, nil, 1, 1, true}, + {3, []string{"class1"}, nil, []string{"class1"}, 2, 3, true}, + {1, []string{"class1"}, nil, []string{"class1", "class2"}, 1, 1, true}, + {1, []string{"class1"}, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true}, + {1, []string{"class1"}, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false}, + {nServers*2 + 1, []string{}, []string{"class1"}, nil, nServers, nServers, false}, + {1, []string{"class1"}, []string{"class404"}, nil, nServers, nServers, false}, + {1, []string{"class1"}, []string{"class1", "class404"}, nil, nServers, nServers, false}, + {1, []string{"class1"}, nil, []string{"class1", "class404"}, nServers, nServers, false}, + } { + c.Logf("%+v", trial) + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "*", + returnStorageClasses: "class1=2, class2=2", + handled: make(chan string, 100), + } + ks := RunSomeFakeKeepServers(st, nServers) + arv, _ := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(arv) + kc.Want_replicas = trial.replicas + kc.StorageClasses = trial.clientClasses + kc.DefaultStorageClasses = trial.defaultClasses + arv.ApiToken = "abc123" + localRoots := make(map[string]string) + writableLocalRoots := make(map[string]string) + for i, k := range ks { + localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + defer k.listener.Close() + } + kc.SetServiceRoots(localRoots, writableLocalRoots, nil) + + _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Data: []byte("foo"), + StorageClasses: trial.putClasses, }) + if trial.success { + c.Check(err, IsNil) + } else { + c.Check(err, NotNil) + } + c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests)) + c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests)) + if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, Equals, true) { + // Max concurrency should be 1. First request + // should have succeeded for class1. Second + // request should only ask for class404. + c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), Equals, "class404") + } + } } type FailHandler struct { @@ -199,17 +422,17 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { } type FailThenSucceedHandler struct { + morefails int // fail 1 + this many times before succeeding handled chan string - count int + count atomic.Int64 successhandler http.Handler reqIDs []string } func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id")) - if fh.count == 0 { + if int(fh.count.Add(1)) <= fh.morefails+1 { resp.WriteHeader(500) - fh.count += 1 fh.handled <- fmt.Sprintf("http://%s", req.Host) } else { fh.successhandler.ServeHTTP(resp, req) @@ -233,16 +456,16 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { UploadToStubHelper(c, st, func(kc *KeepClient, url string, reader io.ReadCloser, - writer io.WriteCloser, upload_status chan uploadStatus) { + writer io.WriteCloser, uploadStatusChan chan uploadStatus) { - go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, kc.getRequestID()) + go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID()) writer.Write([]byte("foo")) writer.Close() <-st.handled - status := <-upload_status + status := <-uploadStatusChan c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash)) c.Check(status.statusCode, Equals, 500) }) @@ -256,7 +479,7 @@ type KeepServer struct { func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) { ks = make([]KeepServer, n) - for i := 0; i < n; i += 1 { + for i := 0; i < n; i++ { ks[i] = RunFakeKeepServer(st) } @@ -266,13 +489,15 @@ func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) { func (s *StandaloneSuite) TestPutB(c *C) { hash := Md5String("foo") - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv) @@ -308,13 +533,15 @@ func (s *StandaloneSuite) TestPutB(c *C) { func (s *StandaloneSuite) TestPutHR(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv) @@ -334,14 +561,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) { kc.SetServiceRoots(localRoots, writableLocalRoots, nil) - reader, writer := io.Pipe() - - go func() { - writer.Write([]byte("foo")) - writer.Close() - }() - - kc.PutHR(hash, reader, 3) + kc.PutHR(hash, bytes.NewBuffer([]byte("foo")), 3) shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots() @@ -357,13 +577,15 @@ func (s *StandaloneSuite) TestPutHR(c *C) { func (s *StandaloneSuite) TestPutWithFail(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 4)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 4), + } fh := FailHandler{ make(chan string, 1)} @@ -401,7 +623,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) { <-fh.handled - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(phash, Equals, "") c.Check(replicas, Equals, 2) @@ -417,13 +639,15 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) { func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 1)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 1), + } fh := FailHandler{ make(chan string, 4)} @@ -456,7 +680,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 1) c.Check(<-st.handled, Equals, ks1[0].url) } @@ -464,21 +688,21 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { type StubGetHandler struct { c *C expectPath string - expectApiToken string + expectAPIToken string httpStatus int body []byte } func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath) - sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken)) + sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken)) resp.WriteHeader(sgh.httpStatus) resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body))) resp.Write(sgh.body) } func (s *StandaloneSuite) TestGet(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := StubGetHandler{ c, @@ -496,19 +720,18 @@ func (s *StandaloneSuite) TestGet(c *C) { arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) - r, n, url2, err := kc.Get(hash) - defer r.Close() - c.Check(err, Equals, nil) + r, n, _, err := kc.Get(hash) + c.Assert(err, IsNil) c.Check(n, Equals, int64(3)) - c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash)) content, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) + c.Check(err2, IsNil) c.Check(content, DeepEquals, []byte("foo")) + c.Check(r.Close(), IsNil) } func (s *StandaloneSuite) TestGet404(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := Error404Handler{make(chan string, 1)} @@ -521,11 +744,10 @@ func (s *StandaloneSuite) TestGet404(c *C) { arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) - r, n, url2, err := kc.Get(hash) + r, n, _, err := kc.Get(hash) c.Check(err, Equals, BlockNotFound) c.Check(n, Equals, int64(0)) - c.Check(url2, Equals, "") - c.Check(r, Equals, nil) + c.Check(r, IsNil) } func (s *StandaloneSuite) TestGetEmptyBlock(c *C) { @@ -535,22 +757,23 @@ func (s *StandaloneSuite) TestGetEmptyBlock(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() + c.Check(err, IsNil) kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) - r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0") + r, n, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0") c.Check(err, IsNil) c.Check(n, Equals, int64(0)) - c.Check(url2, Equals, "") c.Assert(r, NotNil) buf, err := ioutil.ReadAll(r) c.Check(err, IsNil) c.Check(buf, DeepEquals, []byte{}) + c.Check(r.Close(), IsNil) } func (s *StandaloneSuite) TestGetFail(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := FailHandler{make(chan string, 1)} @@ -564,57 +787,84 @@ func (s *StandaloneSuite) TestGetFail(c *C) { kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) kc.Retries = 0 - r, n, url2, err := kc.Get(hash) + r, n, _, err := kc.Get(hash) errNotFound, _ := err.(*ErrNotFound) - c.Check(errNotFound, NotNil) - c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true) - c.Check(errNotFound.Temporary(), Equals, true) + if c.Check(errNotFound, NotNil) { + c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true) + c.Check(errNotFound.Temporary(), Equals, true) + } c.Check(n, Equals, int64(0)) - c.Check(url2, Equals, "") - c.Check(r, Equals, nil) + c.Check(r, IsNil) } func (s *StandaloneSuite) TestGetFailRetry(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - - st := &FailThenSucceedHandler{ - handled: make(chan string, 1), - successhandler: StubGetHandler{ - c, - hash, - "abc123", - http.StatusOK, - []byte("foo")}} - - ks := RunFakeKeepServer(st) - defer ks.listener.Close() - - arv, err := arvadosclient.MakeArvadosClient() - c.Check(err, IsNil) - kc, _ := MakeKeepClient(arv) - arv.ApiToken = "abc123" - kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) - - r, n, url2, err := kc.Get(hash) - defer r.Close() - c.Check(err, Equals, nil) - c.Check(n, Equals, int64(3)) - c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash)) - - content, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) - c.Check(content, DeepEquals, []byte("foo")) - - c.Logf("%q", st.reqIDs) - c.Assert(len(st.reqIDs) > 1, Equals, true) - for _, reqid := range st.reqIDs { - c.Check(reqid, Not(Equals), "") - c.Check(reqid, Equals, st.reqIDs[0]) + defer func(origDefault, origMinimum time.Duration) { + DefaultRetryDelay = origDefault + MinimumRetryDelay = origMinimum + }(DefaultRetryDelay, MinimumRetryDelay) + DefaultRetryDelay = time.Second / 8 + MinimumRetryDelay = time.Millisecond + + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) + + for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} { + c.Logf("=== initial delay %v", delay) + + st := &FailThenSucceedHandler{ + morefails: 2, + handled: make(chan string, 4), + successhandler: StubGetHandler{ + c, + hash, + "abc123", + http.StatusOK, + []byte("foo")}} + + ks := RunFakeKeepServer(st) + defer ks.listener.Close() + + arv, err := arvadosclient.MakeArvadosClient() + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) + arv.ApiToken = "abc123" + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) + kc.Retries = 3 + kc.RetryDelay = delay + kc.DiskCacheSize = DiskCacheDisabled + + t0 := time.Now() + r, n, _, err := kc.Get(hash) + c.Assert(err, IsNil) + c.Check(n, Equals, int64(3)) + elapsed := time.Since(t0) + + nonsleeptime := time.Second / 10 + expect := kc.RetryDelay + if expect == 0 { + expect = DefaultRetryDelay + } + min := MinimumRetryDelay * 3 + max := expect + expect*2 + expect*2*2 + nonsleeptime + c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min)) + c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max)) + + content, err := ioutil.ReadAll(r) + c.Check(err, IsNil) + c.Check(content, DeepEquals, []byte("foo")) + c.Check(r.Close(), IsNil) + + c.Logf("%q", st.reqIDs) + if c.Check(st.reqIDs, Not(HasLen), 0) { + for _, reqid := range st.reqIDs { + c.Check(reqid, Not(Equals), "") + c.Check(reqid, Equals, st.reqIDs[0]) + } + } } } func (s *StandaloneSuite) TestGetNetError(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) arv, err := arvadosclient.MakeArvadosClient() c.Check(err, IsNil) @@ -622,19 +872,19 @@ func (s *StandaloneSuite) TestGetNetError(c *C) { arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil) - r, n, url2, err := kc.Get(hash) + r, n, _, err := kc.Get(hash) errNotFound, _ := err.(*ErrNotFound) - c.Check(errNotFound, NotNil) - c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true) - c.Check(errNotFound.Temporary(), Equals, true) + if c.Check(errNotFound, NotNil) { + c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true) + c.Check(errNotFound.Temporary(), Equals, true) + } c.Check(n, Equals, int64(0)) - c.Check(url2, Equals, "") - c.Check(r, Equals, nil) + c.Check(r, IsNil) } func (s *StandaloneSuite) TestGetWithServiceHint(c *C) { uuid := "zzzzz-bi6l4-123451234512345" - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) // This one shouldn't be used: ks0 := RunFakeKeepServer(StubGetHandler{ @@ -662,22 +912,21 @@ func (s *StandaloneSuite) TestGetWithServiceHint(c *C) { nil, map[string]string{uuid: ks.url}) - r, n, uri, err := kc.Get(hash + "+K@" + uuid) - defer r.Close() - c.Check(err, Equals, nil) + r, n, _, err := kc.Get(hash + "+K@" + uuid) + c.Assert(err, IsNil) c.Check(n, Equals, int64(3)) - c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid)) content, err := ioutil.ReadAll(r) - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(content, DeepEquals, []byte("foo")) + c.Check(r.Close(), IsNil) } // Use a service hint to fetch from a local disk service, overriding // rendezvous probe order. func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) { uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz" - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) // This one shouldn't be used, although it appears first in // rendezvous probe order: @@ -685,8 +934,8 @@ func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) { c, "error if used", "abc123", - http.StatusOK, - []byte("foo")}) + http.StatusBadGateway, + nil}) defer ks0.listener.Close() // This one should be used: ks := RunFakeKeepServer(StubGetHandler{ @@ -715,20 +964,19 @@ func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) { uuid: ks.url}, ) - r, n, uri, err := kc.Get(hash + "+K@" + uuid) - defer r.Close() - c.Check(err, Equals, nil) + r, n, _, err := kc.Get(hash + "+K@" + uuid) + c.Assert(err, IsNil) c.Check(n, Equals, int64(3)) - c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid)) content, err := ioutil.ReadAll(r) - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(content, DeepEquals, []byte("foo")) + c.Check(r.Close(), IsNil) } func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) { uuid := "zzzzz-bi6l4-123451234512345" - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) ksLocal := RunFakeKeepServer(StubGetHandler{ c, @@ -754,29 +1002,28 @@ func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) { nil, map[string]string{uuid: ksGateway.url}) - r, n, uri, err := kc.Get(hash + "+K@" + uuid) - c.Assert(err, Equals, nil) - defer r.Close() + r, n, _, err := kc.Get(hash + "+K@" + uuid) + c.Assert(err, IsNil) c.Check(n, Equals, int64(3)) - c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid)) content, err := ioutil.ReadAll(r) - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(content, DeepEquals, []byte("foo")) + c.Check(r.Close(), IsNil) } type BarHandler struct { handled chan string } -func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { resp.Write([]byte("bar")) - this.handled <- fmt.Sprintf("http://%s", req.Host) + h.handled <- fmt.Sprintf("http://%s", req.Host) } func (s *StandaloneSuite) TestChecksum(c *C) { - foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar"))) + foohash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) + barhash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar"))) st := BarHandler{make(chan string, 1)} @@ -790,25 +1037,36 @@ func (s *StandaloneSuite) TestChecksum(c *C) { kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, n, _, err := kc.Get(barhash) - c.Check(err, IsNil) - _, err = ioutil.ReadAll(r) - c.Check(n, Equals, int64(3)) - c.Check(err, Equals, nil) + if c.Check(err, IsNil) { + _, err = ioutil.ReadAll(r) + c.Check(n, Equals, int64(3)) + c.Check(err, IsNil) + } - <-st.handled + select { + case <-st.handled: + case <-time.After(time.Second): + c.Fatal("timed out") + } r, n, _, err = kc.Get(foohash) - c.Check(err, IsNil) - _, err = ioutil.ReadAll(r) - c.Check(n, Equals, int64(3)) + if err == nil { + buf, readerr := ioutil.ReadAll(r) + c.Logf("%q", buf) + err = readerr + } c.Check(err, Equals, BadChecksum) - <-st.handled + select { + case <-st.handled: + case <-time.After(time.Second): + c.Fatal("timed out") + } } func (s *StandaloneSuite) TestGetWithFailures(c *C) { content := []byte("waz") - hash := fmt.Sprintf("%x", md5.Sum(content)) + hash := fmt.Sprintf("%x+3", md5.Sum(content)) fh := Error404Handler{ make(chan string, 4)} @@ -852,16 +1110,20 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) { // an example that passes this Assert.) c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url) - r, n, url2, err := kc.Get(hash) + r, n, _, err := kc.Get(hash) - <-fh.handled - c.Check(err, Equals, nil) + select { + case <-fh.handled: + case <-time.After(time.Second): + c.Fatal("timed out") + } + c.Assert(err, IsNil) c.Check(n, Equals, int64(3)) - c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash)) - read_content, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) - c.Check(read_content, DeepEquals, content) + readContent, err2 := ioutil.ReadAll(r) + c.Check(err2, IsNil) + c.Check(readContent, DeepEquals, content) + c.Check(r.Close(), IsNil) } func (s *ServerRequiredSuite) TestPutGetHead(c *C) { @@ -870,9 +1132,9 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) { arv, err := arvadosclient.MakeArvadosClient() c.Check(err, IsNil) kc, err := MakeKeepClient(arv) - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) - hash := fmt.Sprintf("%x", md5.Sum(content)) + hash := fmt.Sprintf("%x+%d", md5.Sum(content), len(content)) { n, _, err := kc.Ask(hash) @@ -881,29 +1143,32 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) { } { hash2, replicas, err := kc.PutB(content) - c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content))) + c.Check(err, IsNil) + c.Check(hash2, Matches, `\Q`+hash+`\E\b.*`) c.Check(replicas, Equals, 2) - c.Check(err, Equals, nil) } { - r, n, url2, err := kc.Get(hash) - c.Check(err, Equals, nil) + r, n, _, err := kc.Get(hash) + c.Check(err, IsNil) c.Check(n, Equals, int64(len(content))) - c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash)) - - read_content, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) - c.Check(read_content, DeepEquals, content) + if c.Check(r, NotNil) { + readContent, err := ioutil.ReadAll(r) + c.Check(err, IsNil) + if c.Check(len(readContent), Equals, len(content)) { + c.Check(readContent, DeepEquals, content) + } + c.Check(r.Close(), IsNil) + } } { n, url2, err := kc.Ask(hash) - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(n, Equals, int64(len(content))) - c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash)) + c.Check(url2, Matches, "http://localhost:\\d+/\\Q"+hash+"\\E") } { loc, err := kc.LocalLocator(hash) - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Assert(len(loc) >= 32, Equals, true) c.Check(loc[:32], Equals, hash[:32]) } @@ -920,9 +1185,9 @@ type StubProxyHandler struct { handled chan string } -func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { resp.Header().Set("X-Keep-Replicas-Stored", "2") - this.handled <- fmt.Sprintf("http://%s", req.Host) + h.handled <- fmt.Sprintf("http://%s", req.Host) } func (s *StandaloneSuite) TestPutProxy(c *C) { @@ -950,7 +1215,7 @@ func (s *StandaloneSuite) TestPutProxy(c *C) { _, replicas, err := kc.PutB([]byte("foo")) <-st.handled - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(replicas, Equals, 2) } @@ -978,13 +1243,13 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) { _, replicas, err := kc.PutB([]byte("foo")) <-st.handled - c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 2) } func (s *StandaloneSuite) TestMakeLocator(c *C) { l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678") - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce") c.Check(l.Size, Equals, 3) c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"}) @@ -992,7 +1257,7 @@ func (s *StandaloneSuite) TestMakeLocator(c *C) { func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) { l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce") - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce") c.Check(l.Size, Equals, -1) c.Check(l.Hints, DeepEquals, []string{}) @@ -1000,7 +1265,7 @@ func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) { func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) { l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678") - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce") c.Check(l.Size, Equals, -1) c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"}) @@ -1009,7 +1274,7 @@ func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) { func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) { str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar" l, err := MakeLocator(str) - c.Check(err, Equals, nil) + c.Check(err, IsNil) c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce") c.Check(l.Size, Equals, 3) c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"}) @@ -1024,13 +1289,15 @@ func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) { func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) { hash := Md5String("foo") - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv) @@ -1054,7 +1321,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 1) c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)]) @@ -1063,13 +1330,15 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) { hash := Md5String("foo") - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv) @@ -1090,7 +1359,7 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) { _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 0) } @@ -1111,14 +1380,14 @@ func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reque } func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := StubGetIndexHandler{ c, "/index", "abc123", http.StatusOK, - []byte(hash + "+3 1443559274\n\n")} + []byte(hash + " 1443559274\n\n")} ks := RunFakeKeepServer(st) defer ks.listener.Close() @@ -1134,19 +1403,19 @@ func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) { c.Check(err, IsNil) content, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) + c.Check(err2, IsNil) c.Check(content, DeepEquals, st.body[0:len(st.body)-1]) } func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := StubGetIndexHandler{ c, "/index/" + hash[0:3], "abc123", http.StatusOK, - []byte(hash + "+3 1443559274\n\n")} + []byte(hash + " 1443559274\n\n")} ks := RunFakeKeepServer(st) defer ks.listener.Close() @@ -1158,15 +1427,15 @@ func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) { kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, err := kc.GetIndex("x", hash[0:3]) - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) content, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) + c.Check(err2, IsNil) c.Check(content, DeepEquals, st.body[0:len(st.body)-1]) } func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := StubGetIndexHandler{ c, @@ -1189,7 +1458,7 @@ func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) { } func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) { - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo"))) st := StubGetIndexHandler{ c, @@ -1229,52 +1498,83 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) { kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, err := kc.GetIndex("x", "abcd") - c.Check(err, Equals, nil) + c.Check(err, IsNil) content, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) + c.Check(err2, IsNil) c.Check(content, DeepEquals, st.body[0:len(st.body)-1]) } func (s *StandaloneSuite) TestPutBRetry(c *C) { - st := &FailThenSucceedHandler{ - handled: make(chan string, 1), - successhandler: StubPutHandler{ - c, - Md5String("foo"), - "abc123", - "foo", - "", - make(chan string, 5)}} + defer func(origDefault, origMinimum time.Duration) { + DefaultRetryDelay = origDefault + MinimumRetryDelay = origMinimum + }(DefaultRetryDelay, MinimumRetryDelay) + DefaultRetryDelay = time.Second / 8 + MinimumRetryDelay = time.Millisecond + + for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} { + c.Logf("=== initial delay %v", delay) + + st := &FailThenSucceedHandler{ + morefails: 5, // handler will fail 6x in total, 3 for each server + handled: make(chan string, 10), + successhandler: &StubPutHandler{ + c: c, + expectPath: Md5String("foo"), + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + }, + } - arv, _ := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(arv) + arv, _ := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(arv) + kc.Retries = 3 + kc.RetryDelay = delay + kc.DiskCacheSize = DiskCacheDisabled + kc.Want_replicas = 2 - kc.Want_replicas = 2 - arv.ApiToken = "abc123" - localRoots := make(map[string]string) - writableLocalRoots := make(map[string]string) + arv.ApiToken = "abc123" + localRoots := make(map[string]string) + writableLocalRoots := make(map[string]string) - ks := RunSomeFakeKeepServers(st, 2) + ks := RunSomeFakeKeepServers(st, 2) - for i, k := range ks { - localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url - writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url - defer k.listener.Close() - } + for i, k := range ks { + localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + defer k.listener.Close() + } - kc.SetServiceRoots(localRoots, writableLocalRoots, nil) + kc.SetServiceRoots(localRoots, writableLocalRoots, nil) - hash, replicas, err := kc.PutB([]byte("foo")) + t0 := time.Now() + hash, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, Equals, nil) - c.Check(hash, Equals, "") - c.Check(replicas, Equals, 2) + c.Check(err, IsNil) + c.Check(hash, Equals, "") + c.Check(replicas, Equals, 2) + elapsed := time.Since(t0) + + nonsleeptime := time.Second / 10 + expect := kc.RetryDelay + if expect == 0 { + expect = DefaultRetryDelay + } + min := MinimumRetryDelay * 3 + max := expect + expect*2 + expect*2*2 + max += nonsleeptime + c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min)) + c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max)) + } } func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) { arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) // Add an additional "testblobstore" keepservice blobKeepService := make(arvadosclient.Dict) @@ -1284,13 +1584,13 @@ func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) { "service_port": "21321", "service_type": "testblobstore"}}, &blobKeepService) - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }() RefreshServiceDiscovery() // Make a keepclient and ensure that the testblobstore is included kc, err := MakeKeepClient(arv) - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) // verify kc.LocalRoots c.Check(len(kc.LocalRoots()), Equals, 3) @@ -1317,3 +1617,51 @@ func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) { c.Assert(kc.foundNonDiskSvc, Equals, true) c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second) } + +func (s *StandaloneSuite) TestDelayCalculator(c *C) { + defer func(origDefault, origMinimum time.Duration) { + DefaultRetryDelay = origDefault + MinimumRetryDelay = origMinimum + }(DefaultRetryDelay, MinimumRetryDelay) + + checkInterval := func(d, min, max time.Duration) { + c.Check(d >= min, Equals, true) + c.Check(d <= max, Equals, true) + } + + MinimumRetryDelay = time.Second / 2 + DefaultRetryDelay = time.Second + dc := delayCalculator{InitialMaxDelay: 0} + checkInterval(dc.Next(), time.Second/2, time.Second) + checkInterval(dc.Next(), time.Second/2, time.Second*2) + checkInterval(dc.Next(), time.Second/2, time.Second*4) + checkInterval(dc.Next(), time.Second/2, time.Second*8) + checkInterval(dc.Next(), time.Second/2, time.Second*10) + checkInterval(dc.Next(), time.Second/2, time.Second*10) + + // Enforce non-zero InitialMaxDelay + dc = delayCalculator{InitialMaxDelay: time.Second} + checkInterval(dc.Next(), time.Second/2, time.Second*2) + checkInterval(dc.Next(), time.Second/2, time.Second*4) + checkInterval(dc.Next(), time.Second/2, time.Second*8) + checkInterval(dc.Next(), time.Second/2, time.Second*16) + checkInterval(dc.Next(), time.Second/2, time.Second*20) + checkInterval(dc.Next(), time.Second/2, time.Second*20) + + // Enforce MinimumRetryDelay + dc = delayCalculator{InitialMaxDelay: time.Millisecond} + checkInterval(dc.Next(), time.Second/2, time.Second/2) + checkInterval(dc.Next(), time.Second/2, time.Second) + checkInterval(dc.Next(), time.Second/2, time.Second*2) + checkInterval(dc.Next(), time.Second/2, time.Second*4) + checkInterval(dc.Next(), time.Second/2, time.Second*8) + checkInterval(dc.Next(), time.Second/2, time.Second*10) + checkInterval(dc.Next(), time.Second/2, time.Second*10) + + // If InitialMaxDelay is less than MinimumRetryDelay/10, then + // delay is always MinimumRetryDelay. + dc = delayCalculator{InitialMaxDelay: time.Millisecond} + for i := 0; i < 20; i++ { + c.Check(dc.Next(), Equals, time.Second/2) + } +}