X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3aaefcb3c76ff470b475d950398d01255e87712a..72d7d41944006d1f48f570784dafe56b9812b0c8:/sdk/go/keepclient/keepclient_test.go diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go index 57a89b50aa..a6e0a11d51 100644 --- a/sdk/go/keepclient/keepclient_test.go +++ b/sdk/go/keepclient/keepclient_test.go @@ -6,8 +6,8 @@ package keepclient import ( "bytes" + "context" "crypto/md5" - "errors" "fmt" "io" "io/ioutil" @@ -16,9 +16,11 @@ import ( "net/http" "os" "strings" + "sync" "testing" "time" + "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/arvadostest" . "gopkg.in/check.v1" @@ -49,13 +51,11 @@ func pythonDir() string { } func (s *ServerRequiredSuite) SetUpSuite(c *C) { - arvadostest.StartAPI() arvadostest.StartKeep(2, false) } func (s *ServerRequiredSuite) TearDownSuite(c *C) { arvadostest.StopKeep(2) - arvadostest.StopAPI() } func (s *ServerRequiredSuite) SetUpTest(c *C) { @@ -75,9 +75,22 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) { } } +func (s *ServerRequiredSuite) TestDefaultStorageClasses(c *C) { + arv, err := arvadosclient.MakeArvadosClient() + c.Assert(err, IsNil) + + cc, err := arv.ClusterConfig("StorageClasses") + c.Assert(err, IsNil) + c.Assert(cc, NotNil) + c.Assert(cc.(map[string]interface{})["default"], NotNil) + + kc := New(arv) + c.Assert(kc.DefaultStorageClasses, DeepEquals, []string{"default"}) +} + func (s *ServerRequiredSuite) TestDefaultReplications(c *C) { arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) kc, err := MakeKeepClient(arv) c.Check(err, IsNil) @@ -95,21 +108,33 @@ func (s *ServerRequiredSuite) TestDefaultReplications(c *C) { } type StubPutHandler struct { - c *C - expectPath string - expectAPIToken string - expectBody string - expectStorageClass string - handled chan string + c *C + expectPath string + expectAPIToken string + expectBody string + expectStorageClass string + returnStorageClasses string + handled chan string + requests []*http.Request + mtx sync.Mutex } -func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + sph.mtx.Lock() + sph.requests = append(sph.requests, req) + sph.mtx.Unlock() sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath) sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken)) - sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass) + if sph.expectStorageClass != "*" { + sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass) + } body, err := ioutil.ReadAll(req.Body) sph.c.Check(err, Equals, nil) sph.c.Check(body, DeepEquals, []byte(sph.expectBody)) + resp.Header().Set("X-Keep-Replicas-Stored", "1") + if sph.returnStorageClasses != "" { + resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses) + } resp.WriteHeader(200) sph.handled <- fmt.Sprintf("http://%s", req.Host) } @@ -120,7 +145,7 @@ func RunFakeKeepServer(st http.Handler) (ks KeepServer) { // bind to 0.0.0.0 or [::] which is not a valid address for Dial() ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0}) if err != nil { - panic(fmt.Sprintf("Could not listen on any port")) + panic("Could not listen on any port") } ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String()) go http.Serve(ks.listener, st) @@ -147,48 +172,234 @@ func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string, func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { log.Printf("TestUploadToStubKeepServer") - st := StubPutHandler{ - c, - "acbd18db4cc2f85cedef654fccc4a4d8", - "abc123", - "foo", - "hot", - make(chan string)} + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "default=1", + handled: make(chan string), + } UploadToStubHelper(c, st, func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) { - kc.StorageClasses = []string{"hot"} - go kc.uploadToKeepServer(url, st.expectPath, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID()) + go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID()) writer.Write([]byte("foo")) writer.Close() <-st.handled status := <-uploadStatusChan - c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""}) + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""}) }) } func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { - st := StubPutHandler{ - c, - "acbd18db4cc2f85cedef654fccc4a4d8", - "abc123", - "foo", - "", - make(chan string)} + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "default=1", + handled: make(chan string), + } UploadToStubHelper(c, st, func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) { - go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID()) + go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID()) <-st.handled status := <-uploadStatusChan - c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""}) + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""}) }) } +func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) { + for _, trial := range []struct { + respHeader string + expectMap map[string]int + }{ + {"", nil}, + {"foo=1", map[string]int{"foo": 1}}, + {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}}, + {" =foo=1 ", nil}, + {"foo", nil}, + } { + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: trial.respHeader, + handled: make(chan string), + } + + UploadToStubHelper(c, st, + func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) { + go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID()) + + writer.Write([]byte("foo")) + writer.Close() + + <-st.handled + status := <-uploadStatusChan + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""}) + }) + } +} + +func (s *StandaloneSuite) TestPutWithoutStorageClassesClusterSupport(c *C) { + nServers := 5 + for _, trial := range []struct { + replicas int + clientClasses []string + putClasses []string + minRequests int + maxRequests int + success bool + }{ + // Talking to an older cluster (no default storage classes exported + // config) and no other additional storage classes requirements. + {1, nil, nil, 1, 1, true}, + {2, nil, nil, 2, 2, true}, + {3, nil, nil, 3, 3, true}, + {nServers*2 + 1, nil, nil, nServers, nServers, false}, + + {1, []string{"class1"}, nil, 1, 1, true}, + {2, []string{"class1"}, nil, 2, 2, true}, + {3, []string{"class1"}, nil, 3, 3, true}, + {1, []string{"class1", "class2"}, nil, 1, 1, true}, + {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false}, + + {1, nil, []string{"class1"}, 1, 1, true}, + {2, nil, []string{"class1"}, 2, 2, true}, + {3, nil, []string{"class1"}, 3, 3, true}, + {1, nil, []string{"class1", "class2"}, 1, 1, true}, + {nServers*2 + 1, nil, []string{"class1"}, nServers, nServers, false}, + } { + c.Logf("%+v", trial) + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "*", + returnStorageClasses: "", // Simulate old cluster without SC keep support + handled: make(chan string, 100), + } + ks := RunSomeFakeKeepServers(st, nServers) + arv, _ := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(arv) + kc.Want_replicas = trial.replicas + kc.StorageClasses = trial.clientClasses + kc.DefaultStorageClasses = nil // Simulate an old cluster without SC defaults + arv.ApiToken = "abc123" + localRoots := make(map[string]string) + writableLocalRoots := make(map[string]string) + for i, k := range ks { + localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + defer k.listener.Close() + } + kc.SetServiceRoots(localRoots, writableLocalRoots, nil) + + _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Data: []byte("foo"), + StorageClasses: trial.putClasses, + }) + if trial.success { + c.Check(err, IsNil) + } else { + c.Check(err, NotNil) + } + c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests)) + c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests)) + if trial.clientClasses == nil && trial.putClasses == nil { + c.Check(st.requests[0].Header.Get("X-Keep-Storage-Classes"), Equals, "") + } + } +} + +func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) { + nServers := 5 + for _, trial := range []struct { + replicas int + defaultClasses []string + clientClasses []string // clientClasses takes precedence over defaultClasses + putClasses []string // putClasses takes precedence over clientClasses + minRequests int + maxRequests int + success bool + }{ + {1, []string{"class1"}, nil, nil, 1, 1, true}, + {2, []string{"class1"}, nil, nil, 1, 2, true}, + {3, []string{"class1"}, nil, nil, 2, 3, true}, + {1, []string{"class1", "class2"}, nil, nil, 1, 1, true}, + + // defaultClasses doesn't matter when any of the others is specified. + {1, []string{"class1"}, []string{"class1"}, nil, 1, 1, true}, + {2, []string{"class1"}, []string{"class1"}, nil, 1, 2, true}, + {3, []string{"class1"}, []string{"class1"}, nil, 2, 3, true}, + {1, []string{"class1"}, []string{"class1", "class2"}, nil, 1, 1, true}, + {3, []string{"class1"}, nil, []string{"class1"}, 2, 3, true}, + {1, []string{"class1"}, nil, []string{"class1", "class2"}, 1, 1, true}, + {1, []string{"class1"}, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true}, + {1, []string{"class1"}, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false}, + {nServers*2 + 1, []string{}, []string{"class1"}, nil, nServers, nServers, false}, + {1, []string{"class1"}, []string{"class404"}, nil, nServers, nServers, false}, + {1, []string{"class1"}, []string{"class1", "class404"}, nil, nServers, nServers, false}, + {1, []string{"class1"}, nil, []string{"class1", "class404"}, nServers, nServers, false}, + } { + c.Logf("%+v", trial) + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "*", + returnStorageClasses: "class1=2, class2=2", + handled: make(chan string, 100), + } + ks := RunSomeFakeKeepServers(st, nServers) + arv, _ := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(arv) + kc.Want_replicas = trial.replicas + kc.StorageClasses = trial.clientClasses + kc.DefaultStorageClasses = trial.defaultClasses + arv.ApiToken = "abc123" + localRoots := make(map[string]string) + writableLocalRoots := make(map[string]string) + for i, k := range ks { + localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + defer k.listener.Close() + } + kc.SetServiceRoots(localRoots, writableLocalRoots, nil) + + _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Data: []byte("foo"), + StorageClasses: trial.putClasses, + }) + if trial.success { + c.Check(err, IsNil) + } else { + c.Check(err, NotNil) + } + c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests)) + c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests)) + if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, Equals, true) { + // Max concurrency should be 1. First request + // should have succeeded for class1. Second + // request should only ask for class404. + c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), Equals, "class404") + } + } +} + type FailHandler struct { handled chan string } @@ -235,7 +446,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) { - go kc.uploadToKeepServer(url, hash, reader, uploadStatusChan, 3, kc.getRequestID()) + go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID()) writer.Write([]byte("foo")) writer.Close() @@ -266,13 +477,15 @@ func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) { func (s *StandaloneSuite) TestPutB(c *C) { hash := Md5String("foo") - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv) @@ -308,13 +521,15 @@ func (s *StandaloneSuite) TestPutB(c *C) { func (s *StandaloneSuite) TestPutHR(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv) @@ -357,13 +572,15 @@ func (s *StandaloneSuite) TestPutHR(c *C) { func (s *StandaloneSuite) TestPutWithFail(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 4)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 4), + } fh := FailHandler{ make(chan string, 1)} @@ -417,13 +634,15 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) { func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 1)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 1), + } fh := FailHandler{ make(chan string, 4)} @@ -456,7 +675,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 1) c.Check(<-st.handled, Equals, ks1[0].url) } @@ -979,7 +1198,7 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) { _, replicas, err := kc.PutB([]byte("foo")) <-st.handled - c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 2) } @@ -1025,13 +1244,15 @@ func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) { func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) { hash := Md5String("foo") - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv) @@ -1055,7 +1276,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 1) c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)]) @@ -1064,13 +1285,15 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) { hash := Md5String("foo") - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - "", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv) @@ -1091,7 +1314,7 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) { _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 0) } @@ -1240,13 +1463,16 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) { func (s *StandaloneSuite) TestPutBRetry(c *C) { st := &FailThenSucceedHandler{ handled: make(chan string, 1), - successhandler: StubPutHandler{ - c, - Md5String("foo"), - "abc123", - "foo", - "", - make(chan string, 5)}} + successhandler: &StubPutHandler{ + c: c, + expectPath: Md5String("foo"), + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + }, + } arv, _ := arvadosclient.MakeArvadosClient() kc, _ := MakeKeepClient(arv)