X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7a7a1395490eb9b9789b79f88a77fe709a898feb..e10c23d41a9591830a20171a0f9cb6cfe2921b11:/sdk/go/keepclient/keepclient_test.go diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go index 2604b02b17..62268fa463 100644 --- a/sdk/go/keepclient/keepclient_test.go +++ b/sdk/go/keepclient/keepclient_test.go @@ -6,8 +6,8 @@ package keepclient import ( "bytes" + "context" "crypto/md5" - "errors" "fmt" "io" "io/ioutil" @@ -16,12 +16,15 @@ import ( "net/http" "os" "strings" + "sync" "testing" "time" + "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/arvadostest" . "gopkg.in/check.v1" + check "gopkg.in/check.v1" ) // Gocheck boilerplate @@ -95,21 +98,33 @@ func (s *ServerRequiredSuite) TestDefaultReplications(c *C) { } type StubPutHandler struct { - c *C - expectPath string - expectApiToken string - expectBody string - expectStorageClass string - handled chan string + c *C + expectPath string + expectAPIToken string + expectBody string + expectStorageClass string + returnStorageClasses string + handled chan string + requests []*http.Request + mtx sync.Mutex } -func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + sph.mtx.Lock() + sph.requests = append(sph.requests, req) + sph.mtx.Unlock() sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath) - sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken)) - sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass) + sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken)) + if sph.expectStorageClass != "*" { + sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass) + } body, err := ioutil.ReadAll(req.Body) sph.c.Check(err, Equals, nil) sph.c.Check(body, DeepEquals, []byte(sph.expectBody)) + resp.Header().Set("X-Keep-Replicas-Stored", "1") + if sph.returnStorageClasses != "" { + resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses) + } resp.WriteHeader(200) sph.handled <- fmt.Sprintf("http://%s", req.Host) } @@ -139,56 +154,162 @@ func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string, kc, _ := MakeKeepClient(arv) reader, writer := io.Pipe() - upload_status := make(chan uploadStatus) + uploadStatusChan := make(chan uploadStatus) - f(kc, ks.url, reader, writer, upload_status) + f(kc, ks.url, reader, writer, uploadStatusChan) } func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { log.Printf("TestUploadToStubKeepServer") - st := StubPutHandler{ - c, - "acbd18db4cc2f85cedef654fccc4a4d8", - "abc123", - "foo", - "hot", - make(chan string)} + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "default=1", + handled: make(chan string), + } UploadToStubHelper(c, st, - func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) { - kc.StorageClasses = []string{"hot"} - go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), kc.getRequestID()) + func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) { + go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID()) writer.Write([]byte("foo")) writer.Close() <-st.handled - status := <-upload_status - c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""}) + status := <-uploadStatusChan + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""}) }) } func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { - st := StubPutHandler{ - c, - "acbd18db4cc2f85cedef654fccc4a4d8", - "abc123", - "foo", - "", - make(chan string)} + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "default=1", + handled: make(chan string), + } UploadToStubHelper(c, st, - func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) { - go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, kc.getRequestID()) + func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) { + go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID()) <-st.handled - status := <-upload_status - c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""}) + status := <-uploadStatusChan + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""}) }) } +func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) { + for _, trial := range []struct { + respHeader string + expectMap map[string]int + }{ + {"", nil}, + {"foo=1", map[string]int{"foo": 1}}, + {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}}, + {" =foo=1 ", nil}, + {"foo", nil}, + } { + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: trial.respHeader, + handled: make(chan string), + } + + UploadToStubHelper(c, st, + func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) { + go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID()) + + writer.Write([]byte("foo")) + writer.Close() + + <-st.handled + status := <-uploadStatusChan + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""}) + }) + } +} + +func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) { + nServers := 5 + for _, trial := range []struct { + replicas int + clientClasses []string + putClasses []string // putClasses takes precedence over clientClasses + minRequests int + maxRequests int + success bool + }{ + {1, []string{"class1"}, nil, 1, 1, true}, + {2, []string{"class1"}, nil, 1, 2, true}, + {3, []string{"class1"}, nil, 2, 3, true}, + {1, []string{"class1", "class2"}, nil, 1, 1, true}, + {3, nil, []string{"class1"}, 2, 3, true}, + {1, nil, []string{"class1", "class2"}, 1, 1, true}, + {1, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true}, + {1, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false}, + {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false}, + {1, []string{"class404"}, nil, nServers, nServers, false}, + {1, []string{"class1", "class404"}, nil, nServers, nServers, false}, + {1, nil, []string{"class1", "class404"}, nServers, nServers, false}, + } { + c.Logf("%+v", trial) + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "*", + returnStorageClasses: "class1=2, class2=2", + handled: make(chan string, 100), + } + ks := RunSomeFakeKeepServers(st, nServers) + arv, _ := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(arv) + kc.Want_replicas = trial.replicas + kc.StorageClasses = trial.clientClasses + arv.ApiToken = "abc123" + localRoots := make(map[string]string) + writableLocalRoots := make(map[string]string) + for i, k := range ks { + localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + defer k.listener.Close() + } + kc.SetServiceRoots(localRoots, writableLocalRoots, nil) + + _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Data: []byte("foo"), + StorageClasses: trial.putClasses, + }) + if trial.success { + c.Check(err, check.IsNil) + } else { + c.Check(err, check.NotNil) + } + c.Check(len(st.handled) >= trial.minRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests)) + c.Check(len(st.handled) <= trial.maxRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests)) + if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, check.Equals, true) { + // Max concurrency should be 1. First request + // should have succeeded for class1. Second + // request should only ask for class404. + c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), check.Equals, "class404") + } + } +} + type FailHandler struct { handled chan string } @@ -209,7 +330,7 @@ func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http. fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id")) if fh.count == 0 { resp.WriteHeader(500) - fh.count += 1 + fh.count++ fh.handled <- fmt.Sprintf("http://%s", req.Host) } else { fh.successhandler.ServeHTTP(resp, req) @@ -233,16 +354,16 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { UploadToStubHelper(c, st, func(kc *KeepClient, url string, reader io.ReadCloser, - writer io.WriteCloser, upload_status chan uploadStatus) { + writer io.WriteCloser, uploadStatusChan chan uploadStatus) { - go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, kc.getRequestID()) + go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID()) writer.Write([]byte("foo")) writer.Close() <-st.handled - status := <-upload_status + status := <-uploadStatusChan c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash)) c.Check(status.statusCode, Equals, 500) }) @@ -256,7 +377,7 @@ type KeepServer struct { func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) { ks = make([]KeepServer, n) - for i := 0; i < n; i += 1 { + for i := 0; i < n; i++ { ks[i] = RunFakeKeepServer(st) } @@ -266,13 +387,15 @@ func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) { func (s *StandaloneSuite) TestPutB(c *C) { hash := Md5String("foo") - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv) @@ -308,13 +431,15 @@ func (s *StandaloneSuite) TestPutB(c *C) { func (s *StandaloneSuite) TestPutHR(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv) @@ -357,13 +482,15 @@ func (s *StandaloneSuite) TestPutHR(c *C) { func (s *StandaloneSuite) TestPutWithFail(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 4)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "", + handled: make(chan string, 4), + } fh := FailHandler{ make(chan string, 1)} @@ -417,13 +544,15 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) { func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 1)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "", + handled: make(chan string, 1), + } fh := FailHandler{ make(chan string, 4)} @@ -456,7 +585,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 1) c.Check(<-st.handled, Equals, ks1[0].url) } @@ -464,14 +593,14 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { type StubGetHandler struct { c *C expectPath string - expectApiToken string + expectAPIToken string httpStatus int body []byte } func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath) - sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken)) + sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken)) resp.WriteHeader(sgh.httpStatus) resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body))) resp.Write(sgh.body) @@ -770,9 +899,9 @@ type BarHandler struct { handled chan string } -func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { resp.Write([]byte("bar")) - this.handled <- fmt.Sprintf("http://%s", req.Host) + h.handled <- fmt.Sprintf("http://%s", req.Host) } func (s *StandaloneSuite) TestChecksum(c *C) { @@ -860,9 +989,9 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) { c.Check(n, Equals, int64(3)) c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash)) - read_content, err2 := ioutil.ReadAll(r) + readContent, err2 := ioutil.ReadAll(r) c.Check(err2, Equals, nil) - c.Check(read_content, DeepEquals, content) + c.Check(readContent, DeepEquals, content) } func (s *ServerRequiredSuite) TestPutGetHead(c *C) { @@ -892,9 +1021,9 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) { c.Check(n, Equals, int64(len(content))) c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash)) - read_content, err2 := ioutil.ReadAll(r) + readContent, err2 := ioutil.ReadAll(r) c.Check(err2, Equals, nil) - c.Check(read_content, DeepEquals, content) + c.Check(readContent, DeepEquals, content) } { n, url2, err := kc.Ask(hash) @@ -921,9 +1050,9 @@ type StubProxyHandler struct { handled chan string } -func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { resp.Header().Set("X-Keep-Replicas-Stored", "2") - this.handled <- fmt.Sprintf("http://%s", req.Host) + h.handled <- fmt.Sprintf("http://%s", req.Host) } func (s *StandaloneSuite) TestPutProxy(c *C) { @@ -979,7 +1108,7 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) { _, replicas, err := kc.PutB([]byte("foo")) <-st.handled - c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 2) } @@ -1025,13 +1154,15 @@ func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) { func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) { hash := Md5String("foo") - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv) @@ -1055,7 +1186,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 1) c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)]) @@ -1064,13 +1195,15 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) { hash := Md5String("foo") - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv) @@ -1091,7 +1224,7 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) { _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 0) } @@ -1240,13 +1373,16 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) { func (s *StandaloneSuite) TestPutBRetry(c *C) { st := &FailThenSucceedHandler{ handled: make(chan string, 1), - successhandler: StubPutHandler{ - c, - Md5String("foo"), - "abc123", - "foo", - "", - make(chan string, 5)}} + successhandler: &StubPutHandler{ + c: c, + expectPath: Md5String("foo"), + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "", + handled: make(chan string, 5), + }, + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv)