X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9bc0194676e3f22b41976fefc6146d7dd965d173..6fc3e8d44c1bf825f7c3727bab1fef81d2518288:/sdk/go/keepclient/keepclient_test.go diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go index 4615ebc92e..a6e0a11d51 100644 --- a/sdk/go/keepclient/keepclient_test.go +++ b/sdk/go/keepclient/keepclient_test.go @@ -1,13 +1,14 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + package keepclient import ( + "bytes" + "context" "crypto/md5" - "flag" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/arvadostest" - "git.curoverse.com/arvados.git/sdk/go/streamer" - . "gopkg.in/check.v1" "io" "io/ioutil" "log" @@ -15,7 +16,14 @@ import ( "net/http" "os" "strings" + "sync" "testing" + "time" + + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/arvadostest" + . "gopkg.in/check.v1" ) // Gocheck boilerplate @@ -27,41 +35,38 @@ func Test(t *testing.T) { var _ = Suite(&ServerRequiredSuite{}) var _ = Suite(&StandaloneSuite{}) -var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'") - // Tests that require the Keep server running type ServerRequiredSuite struct{} // Standalone tests type StandaloneSuite struct{} +func (s *StandaloneSuite) SetUpTest(c *C) { + RefreshServiceDiscovery() +} + func pythonDir() string { cwd, _ := os.Getwd() return fmt.Sprintf("%s/../../python/tests", cwd) } func (s *ServerRequiredSuite) SetUpSuite(c *C) { - if *no_server { - c.Skip("Skipping tests that require server") - return - } - arvadostest.StartAPI() arvadostest.StartKeep(2, false) } func (s *ServerRequiredSuite) TearDownSuite(c *C) { - if *no_server { - return - } arvadostest.StopKeep(2) - arvadostest.StopAPI() +} + +func (s *ServerRequiredSuite) SetUpTest(c *C) { + RefreshServiceDiscovery() } func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) { arv, err := arvadosclient.MakeArvadosClient() c.Assert(err, Equals, nil) - kc, err := MakeKeepClient(&arv) + kc, err := MakeKeepClient(arv) c.Assert(err, Equals, nil) c.Check(len(kc.LocalRoots()), Equals, 2) @@ -70,45 +75,77 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) { } } +func (s *ServerRequiredSuite) TestDefaultStorageClasses(c *C) { + arv, err := arvadosclient.MakeArvadosClient() + c.Assert(err, IsNil) + + cc, err := arv.ClusterConfig("StorageClasses") + c.Assert(err, IsNil) + c.Assert(cc, NotNil) + c.Assert(cc.(map[string]interface{})["default"], NotNil) + + kc := New(arv) + c.Assert(kc.DefaultStorageClasses, DeepEquals, []string{"default"}) +} + func (s *ServerRequiredSuite) TestDefaultReplications(c *C) { arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, Equals, nil) + c.Assert(err, IsNil) - kc, err := MakeKeepClient(&arv) + kc, err := MakeKeepClient(arv) + c.Check(err, IsNil) c.Assert(kc.Want_replicas, Equals, 2) arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0 - kc, err = MakeKeepClient(&arv) + kc, err = MakeKeepClient(arv) + c.Check(err, IsNil) c.Assert(kc.Want_replicas, Equals, 3) arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0 - kc, err = MakeKeepClient(&arv) + kc, err = MakeKeepClient(arv) + c.Check(err, IsNil) c.Assert(kc.Want_replicas, Equals, 1) } type StubPutHandler struct { - c *C - expectPath string - expectApiToken string - expectBody string - handled chan string + c *C + expectPath string + expectAPIToken string + expectBody string + expectStorageClass string + returnStorageClasses string + handled chan string + requests []*http.Request + mtx sync.Mutex } -func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + sph.mtx.Lock() + sph.requests = append(sph.requests, req) + sph.mtx.Unlock() sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath) - sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken)) + sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken)) + if sph.expectStorageClass != "*" { + sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass) + } body, err := ioutil.ReadAll(req.Body) sph.c.Check(err, Equals, nil) sph.c.Check(body, DeepEquals, []byte(sph.expectBody)) + resp.Header().Set("X-Keep-Replicas-Stored", "1") + if sph.returnStorageClasses != "" { + resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses) + } resp.WriteHeader(200) sph.handled <- fmt.Sprintf("http://%s", req.Host) } func RunFakeKeepServer(st http.Handler) (ks KeepServer) { var err error - ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0}) + // If we don't explicitly bind it to localhost, ks.listener.Addr() will + // bind to 0.0.0.0 or [::] which is not a valid address for Dial() + ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0}) if err != nil { - panic(fmt.Sprintf("Could not listen on any port")) + panic("Could not listen on any port") } ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String()) go http.Serve(ks.listener, st) @@ -124,71 +161,243 @@ func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string, arv, _ := arvadosclient.MakeArvadosClient() arv.ApiToken = "abc123" - kc, _ := MakeKeepClient(&arv) + kc, _ := MakeKeepClient(arv) reader, writer := io.Pipe() - upload_status := make(chan uploadStatus) + uploadStatusChan := make(chan uploadStatus) - f(kc, ks.url, reader, writer, upload_status) + f(kc, ks.url, reader, writer, uploadStatusChan) } func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { log.Printf("TestUploadToStubKeepServer") - st := StubPutHandler{ - c, - "acbd18db4cc2f85cedef654fccc4a4d8", - "abc123", - "foo", - make(chan string)} + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "default=1", + handled: make(chan string), + } UploadToStubHelper(c, st, - func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) { - - go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), 0) + func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) { + go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID()) writer.Write([]byte("foo")) writer.Close() <-st.handled - status := <-upload_status - c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""}) + status := <-uploadStatusChan + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""}) }) - - log.Printf("TestUploadToStubKeepServer done") } func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { - log.Printf("TestUploadToStubKeepServerBufferReader") - - st := StubPutHandler{ - c, - "acbd18db4cc2f85cedef654fccc4a4d8", - "abc123", - "foo", - make(chan string)} + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "default=1", + handled: make(chan string), + } UploadToStubHelper(c, st, - func(kc *KeepClient, url string, reader io.ReadCloser, - writer io.WriteCloser, upload_status chan uploadStatus) { + func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) { + go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID()) - tr := streamer.AsyncStreamFromReader(512, reader) - defer tr.Close() + <-st.handled - br1 := tr.MakeStreamReader() + status := <-uploadStatusChan + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""}) + }) +} - go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0) +func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) { + for _, trial := range []struct { + respHeader string + expectMap map[string]int + }{ + {"", nil}, + {"foo=1", map[string]int{"foo": 1}}, + {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}}, + {" =foo=1 ", nil}, + {"foo", nil}, + } { + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: trial.respHeader, + handled: make(chan string), + } - writer.Write([]byte("foo")) - writer.Close() + UploadToStubHelper(c, st, + func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) { + go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID()) - <-st.handled + writer.Write([]byte("foo")) + writer.Close() + + <-st.handled + status := <-uploadStatusChan + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""}) + }) + } +} - status := <-upload_status - c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""}) +func (s *StandaloneSuite) TestPutWithoutStorageClassesClusterSupport(c *C) { + nServers := 5 + for _, trial := range []struct { + replicas int + clientClasses []string + putClasses []string + minRequests int + maxRequests int + success bool + }{ + // Talking to an older cluster (no default storage classes exported + // config) and no other additional storage classes requirements. + {1, nil, nil, 1, 1, true}, + {2, nil, nil, 2, 2, true}, + {3, nil, nil, 3, 3, true}, + {nServers*2 + 1, nil, nil, nServers, nServers, false}, + + {1, []string{"class1"}, nil, 1, 1, true}, + {2, []string{"class1"}, nil, 2, 2, true}, + {3, []string{"class1"}, nil, 3, 3, true}, + {1, []string{"class1", "class2"}, nil, 1, 1, true}, + {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false}, + + {1, nil, []string{"class1"}, 1, 1, true}, + {2, nil, []string{"class1"}, 2, 2, true}, + {3, nil, []string{"class1"}, 3, 3, true}, + {1, nil, []string{"class1", "class2"}, 1, 1, true}, + {nServers*2 + 1, nil, []string{"class1"}, nServers, nServers, false}, + } { + c.Logf("%+v", trial) + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "*", + returnStorageClasses: "", // Simulate old cluster without SC keep support + handled: make(chan string, 100), + } + ks := RunSomeFakeKeepServers(st, nServers) + arv, _ := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(arv) + kc.Want_replicas = trial.replicas + kc.StorageClasses = trial.clientClasses + kc.DefaultStorageClasses = nil // Simulate an old cluster without SC defaults + arv.ApiToken = "abc123" + localRoots := make(map[string]string) + writableLocalRoots := make(map[string]string) + for i, k := range ks { + localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + defer k.listener.Close() + } + kc.SetServiceRoots(localRoots, writableLocalRoots, nil) + + _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Data: []byte("foo"), + StorageClasses: trial.putClasses, }) + if trial.success { + c.Check(err, IsNil) + } else { + c.Check(err, NotNil) + } + c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests)) + c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests)) + if trial.clientClasses == nil && trial.putClasses == nil { + c.Check(st.requests[0].Header.Get("X-Keep-Storage-Classes"), Equals, "") + } + } +} + +func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) { + nServers := 5 + for _, trial := range []struct { + replicas int + defaultClasses []string + clientClasses []string // clientClasses takes precedence over defaultClasses + putClasses []string // putClasses takes precedence over clientClasses + minRequests int + maxRequests int + success bool + }{ + {1, []string{"class1"}, nil, nil, 1, 1, true}, + {2, []string{"class1"}, nil, nil, 1, 2, true}, + {3, []string{"class1"}, nil, nil, 2, 3, true}, + {1, []string{"class1", "class2"}, nil, nil, 1, 1, true}, + + // defaultClasses doesn't matter when any of the others is specified. + {1, []string{"class1"}, []string{"class1"}, nil, 1, 1, true}, + {2, []string{"class1"}, []string{"class1"}, nil, 1, 2, true}, + {3, []string{"class1"}, []string{"class1"}, nil, 2, 3, true}, + {1, []string{"class1"}, []string{"class1", "class2"}, nil, 1, 1, true}, + {3, []string{"class1"}, nil, []string{"class1"}, 2, 3, true}, + {1, []string{"class1"}, nil, []string{"class1", "class2"}, 1, 1, true}, + {1, []string{"class1"}, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true}, + {1, []string{"class1"}, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false}, + {nServers*2 + 1, []string{}, []string{"class1"}, nil, nServers, nServers, false}, + {1, []string{"class1"}, []string{"class404"}, nil, nServers, nServers, false}, + {1, []string{"class1"}, []string{"class1", "class404"}, nil, nServers, nServers, false}, + {1, []string{"class1"}, nil, []string{"class1", "class404"}, nServers, nServers, false}, + } { + c.Logf("%+v", trial) + st := &StubPutHandler{ + c: c, + expectPath: "acbd18db4cc2f85cedef654fccc4a4d8", + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "*", + returnStorageClasses: "class1=2, class2=2", + handled: make(chan string, 100), + } + ks := RunSomeFakeKeepServers(st, nServers) + arv, _ := arvadosclient.MakeArvadosClient() + kc, _ := MakeKeepClient(arv) + kc.Want_replicas = trial.replicas + kc.StorageClasses = trial.clientClasses + kc.DefaultStorageClasses = trial.defaultClasses + arv.ApiToken = "abc123" + localRoots := make(map[string]string) + writableLocalRoots := make(map[string]string) + for i, k := range ks { + localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + defer k.listener.Close() + } + kc.SetServiceRoots(localRoots, writableLocalRoots, nil) - log.Printf("TestUploadToStubKeepServerBufferReader done") + _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Data: []byte("foo"), + StorageClasses: trial.putClasses, + }) + if trial.success { + c.Check(err, IsNil) + } else { + c.Check(err, NotNil) + } + c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests)) + c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests)) + if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, Equals, true) { + // Max concurrency should be 1. First request + // should have succeeded for class1. Second + // request should only ask for class404. + c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), Equals, "class404") + } + } } type FailHandler struct { @@ -203,13 +412,15 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { type FailThenSucceedHandler struct { handled chan string count int - successhandler StubGetHandler + successhandler http.Handler + reqIDs []string } func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id")) if fh.count == 0 { resp.WriteHeader(500) - fh.count += 1 + fh.count++ fh.handled <- fmt.Sprintf("http://%s", req.Host) } else { fh.successhandler.ServeHTTP(resp, req) @@ -226,8 +437,6 @@ func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) } func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { - log.Printf("TestFailedUploadToStubKeepServer") - st := FailHandler{ make(chan string)} @@ -235,20 +444,19 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { UploadToStubHelper(c, st, func(kc *KeepClient, url string, reader io.ReadCloser, - writer io.WriteCloser, upload_status chan uploadStatus) { + writer io.WriteCloser, uploadStatusChan chan uploadStatus) { - go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, 0) + go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID()) writer.Write([]byte("foo")) writer.Close() <-st.handled - status := <-upload_status + status := <-uploadStatusChan c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash)) c.Check(status.statusCode, Equals, 500) }) - log.Printf("TestFailedUploadToStubKeepServer done") } type KeepServer struct { @@ -259,7 +467,7 @@ type KeepServer struct { func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) { ks = make([]KeepServer, n) - for i := 0; i < n; i += 1 { + for i := 0; i < n; i++ { ks[i] = RunFakeKeepServer(st) } @@ -267,19 +475,20 @@ func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) { } func (s *StandaloneSuite) TestPutB(c *C) { - log.Printf("TestPutB") - hash := Md5String("foo") - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + kc, _ := MakeKeepClient(arv) kc.Want_replicas = 2 arv.ApiToken = "abc123" @@ -307,24 +516,23 @@ func (s *StandaloneSuite) TestPutB(c *C) { (s1 == shuff[1] && s2 == shuff[0]), Equals, true) - - log.Printf("TestPutB done") } func (s *StandaloneSuite) TestPutHR(c *C) { - log.Printf("TestPutHR") - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + kc, _ := MakeKeepClient(arv) kc.Want_replicas = 2 arv.ApiToken = "abc123" @@ -351,7 +559,6 @@ func (s *StandaloneSuite) TestPutHR(c *C) { kc.PutHR(hash, reader, 3) shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots() - log.Print(shuff) s1 := <-st.handled s2 := <-st.handled @@ -360,27 +567,27 @@ func (s *StandaloneSuite) TestPutHR(c *C) { (s1 == shuff[1] && s2 == shuff[0]), Equals, true) - - log.Printf("TestPutHR done") } func (s *StandaloneSuite) TestPutWithFail(c *C) { - log.Printf("TestPutWithFail") - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - make(chan string, 4)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 4), + } fh := FailHandler{ make(chan string, 1)} arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) kc.Want_replicas = 2 arv.ApiToken = "abc123" @@ -405,6 +612,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) { shuff := NewRootSorter( kc.LocalRoots(), Md5String("foo")).GetSortedRoots() + c.Logf("%+v", shuff) phash, replicas, err := kc.PutB([]byte("foo")) @@ -424,22 +632,24 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) { } func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { - log.Printf("TestPutWithTooManyFail") - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - make(chan string, 1)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 1), + } fh := FailHandler{ make(chan string, 4)} arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) kc.Want_replicas = 2 kc.Retries = 0 @@ -465,32 +675,28 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, Equals, InsufficientReplicasError) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 1) c.Check(<-st.handled, Equals, ks1[0].url) - - log.Printf("TestPutWithTooManyFail done") } type StubGetHandler struct { c *C expectPath string - expectApiToken string + expectAPIToken string httpStatus int body []byte } func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath) - sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken)) + sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken)) resp.WriteHeader(sgh.httpStatus) resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body))) resp.Write(sgh.body) } func (s *StandaloneSuite) TestGet(c *C) { - log.Printf("TestGet") - hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) st := StubGetHandler{ @@ -504,7 +710,8 @@ func (s *StandaloneSuite) TestGet(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) @@ -517,8 +724,6 @@ func (s *StandaloneSuite) TestGet(c *C) { content, err2 := ioutil.ReadAll(r) c.Check(err2, Equals, nil) c.Check(content, DeepEquals, []byte("foo")) - - log.Printf("TestGet done") } func (s *StandaloneSuite) TestGet404(c *C) { @@ -530,7 +735,8 @@ func (s *StandaloneSuite) TestGet404(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) @@ -541,6 +747,28 @@ func (s *StandaloneSuite) TestGet404(c *C) { c.Check(r, Equals, nil) } +func (s *StandaloneSuite) TestGetEmptyBlock(c *C) { + st := Error404Handler{make(chan string, 1)} + + ks := RunFakeKeepServer(st) + defer ks.listener.Close() + + arv, err := arvadosclient.MakeArvadosClient() + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) + arv.ApiToken = "abc123" + kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) + + r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0") + c.Check(err, IsNil) + c.Check(n, Equals, int64(0)) + c.Check(url2, Equals, "") + c.Assert(r, NotNil) + buf, err := ioutil.ReadAll(r) + c.Check(err, IsNil) + c.Check(buf, DeepEquals, []byte{}) +} + func (s *StandaloneSuite) TestGetFail(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) @@ -550,7 +778,8 @@ func (s *StandaloneSuite) TestGetFail(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) kc.Retries = 0 @@ -568,8 +797,9 @@ func (s *StandaloneSuite) TestGetFail(c *C) { func (s *StandaloneSuite) TestGetFailRetry(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - st := &FailThenSucceedHandler{make(chan string, 1), 0, - StubGetHandler{ + st := &FailThenSucceedHandler{ + handled: make(chan string, 1), + successhandler: StubGetHandler{ c, hash, "abc123", @@ -580,7 +810,8 @@ func (s *StandaloneSuite) TestGetFailRetry(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) @@ -593,13 +824,21 @@ func (s *StandaloneSuite) TestGetFailRetry(c *C) { content, err2 := ioutil.ReadAll(r) c.Check(err2, Equals, nil) c.Check(content, DeepEquals, []byte("foo")) + + c.Logf("%q", st.reqIDs) + c.Assert(len(st.reqIDs) > 1, Equals, true) + for _, reqid := range st.reqIDs { + c.Check(reqid, Not(Equals), "") + c.Check(reqid, Equals, st.reqIDs[0]) + } } func (s *StandaloneSuite) TestGetNetError(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil) @@ -635,7 +874,8 @@ func (s *StandaloneSuite) TestGetWithServiceHint(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots( map[string]string{"x": ks0.url}, @@ -678,20 +918,21 @@ func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots( map[string]string{ "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url, "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url, "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url, - uuid: ks.url}, + uuid: ks.url}, nil, map[string]string{ "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url, "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url, "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url, - uuid: ks.url}, + uuid: ks.url}, ) r, n, uri, err := kc.Get(hash + "+K@" + uuid) @@ -725,7 +966,8 @@ func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) { defer ksGateway.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots( map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url}, @@ -747,9 +989,9 @@ type BarHandler struct { handled chan string } -func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { resp.Write([]byte("bar")) - this.handled <- fmt.Sprintf("http://%s", req.Host) + h.handled <- fmt.Sprintf("http://%s", req.Host) } func (s *StandaloneSuite) TestChecksum(c *C) { @@ -762,11 +1004,13 @@ func (s *StandaloneSuite) TestChecksum(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, n, _, err := kc.Get(barhash) + c.Check(err, IsNil) _, err = ioutil.ReadAll(r) c.Check(n, Equals, int64(3)) c.Check(err, Equals, nil) @@ -774,6 +1018,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) { <-st.handled r, n, _, err = kc.Get(foohash) + c.Check(err, IsNil) _, err = ioutil.ReadAll(r) c.Check(n, Equals, int64(3)) c.Check(err, Equals, BadChecksum) @@ -796,7 +1041,8 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) { content} arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" localRoots := make(map[string]string) writableLocalRoots := make(map[string]string) @@ -833,16 +1079,17 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) { c.Check(n, Equals, int64(3)) c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash)) - read_content, err2 := ioutil.ReadAll(r) + readContent, err2 := ioutil.ReadAll(r) c.Check(err2, Equals, nil) - c.Check(read_content, DeepEquals, content) + c.Check(readContent, DeepEquals, content) } func (s *ServerRequiredSuite) TestPutGetHead(c *C) { content := []byte("TestPutGetHead") arv, err := arvadosclient.MakeArvadosClient() - kc, err := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, err := MakeKeepClient(arv) c.Assert(err, Equals, nil) hash := fmt.Sprintf("%x", md5.Sum(content)) @@ -864,9 +1111,9 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) { c.Check(n, Equals, int64(len(content))) c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash)) - read_content, err2 := ioutil.ReadAll(r) + readContent, err2 := ioutil.ReadAll(r) c.Check(err2, Equals, nil) - c.Check(read_content, DeepEquals, content) + c.Check(readContent, DeepEquals, content) } { n, url2, err := kc.Ask(hash) @@ -874,27 +1121,38 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) { c.Check(n, Equals, int64(len(content))) c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash)) } + { + loc, err := kc.LocalLocator(hash) + c.Check(err, Equals, nil) + c.Assert(len(loc) >= 32, Equals, true) + c.Check(loc[:32], Equals, hash[:32]) + } + { + content := []byte("the perth county conspiracy") + loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content))) + c.Check(loc, Equals, "") + c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`) + c.Check(err, ErrorMatches, `.*HTTP 400.*`) + } } type StubProxyHandler struct { handled chan string } -func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { resp.Header().Set("X-Keep-Replicas-Stored", "2") - this.handled <- fmt.Sprintf("http://%s", req.Host) + h.handled <- fmt.Sprintf("http://%s", req.Host) } func (s *StandaloneSuite) TestPutProxy(c *C) { - log.Printf("TestPutProxy") - st := StubProxyHandler{make(chan string, 1)} arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) kc.Want_replicas = 2 - kc.Using_proxy = true arv.ApiToken = "abc123" localRoots := make(map[string]string) writableLocalRoots := make(map[string]string) @@ -914,20 +1172,16 @@ func (s *StandaloneSuite) TestPutProxy(c *C) { c.Check(err, Equals, nil) c.Check(replicas, Equals, 2) - - log.Printf("TestPutProxy done") } func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) { - log.Printf("TestPutProxy") - st := StubProxyHandler{make(chan string, 1)} arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) kc.Want_replicas = 3 - kc.Using_proxy = true arv.ApiToken = "abc123" localRoots := make(map[string]string) writableLocalRoots := make(map[string]string) @@ -944,10 +1198,8 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) { _, replicas, err := kc.PutB([]byte("foo")) <-st.handled - c.Check(err, Equals, InsufficientReplicasError) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 2) - - log.Printf("TestPutProxy done") } func (s *StandaloneSuite) TestMakeLocator(c *C) { @@ -992,15 +1244,18 @@ func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) { func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) { hash := Md5String("foo") - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + kc, _ := MakeKeepClient(arv) kc.Want_replicas = 2 arv.ApiToken = "abc123" @@ -1021,7 +1276,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, Equals, InsufficientReplicasError) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 1) c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)]) @@ -1030,15 +1285,18 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) { hash := Md5String("foo") - st := StubPutHandler{ - c, - hash, - "abc123", - "foo", - make(chan string, 5)} + st := &StubPutHandler{ + c: c, + expectPath: hash, + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "", + returnStorageClasses: "", + handled: make(chan string, 5), + } arv, _ := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + kc, _ := MakeKeepClient(arv) kc.Want_replicas = 2 arv.ApiToken = "abc123" @@ -1056,7 +1314,7 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) { _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, Equals, InsufficientReplicasError) + c.Check(err, FitsTypeOf, InsufficientReplicasError{}) c.Check(replicas, Equals, 0) } @@ -1090,12 +1348,14 @@ func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Assert(err, IsNil) + kc, err := MakeKeepClient(arv) + c.Assert(err, IsNil) arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, err := kc.GetIndex("x", "") - c.Check(err, Equals, nil) + c.Check(err, IsNil) content, err2 := ioutil.ReadAll(r) c.Check(err2, Equals, nil) @@ -1116,12 +1376,13 @@ func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, err := kc.GetIndex("x", hash[0:3]) - c.Check(err, Equals, nil) + c.Assert(err, Equals, nil) content, err2 := ioutil.ReadAll(r) c.Check(err2, Equals, nil) @@ -1142,7 +1403,8 @@ func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) @@ -1164,7 +1426,8 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) @@ -1184,7 +1447,8 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + c.Check(err, IsNil) + kc, _ := MakeKeepClient(arv) arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) @@ -1196,33 +1460,22 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) { c.Check(content, DeepEquals, st.body[0:len(st.body)-1]) } -type FailThenSucceedPutHandler struct { - handled chan string - count int - successhandler StubPutHandler -} - -func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - if h.count == 0 { - resp.WriteHeader(500) - h.count += 1 - h.handled <- fmt.Sprintf("http://%s", req.Host) - } else { - h.successhandler.ServeHTTP(resp, req) - } -} - func (s *StandaloneSuite) TestPutBRetry(c *C) { - st := &FailThenSucceedPutHandler{make(chan string, 1), 0, - StubPutHandler{ - c, - Md5String("foo"), - "abc123", - "foo", - make(chan string, 5)}} + st := &FailThenSucceedHandler{ + handled: make(chan string, 1), + successhandler: &StubPutHandler{ + c: c, + expectPath: Md5String("foo"), + expectAPIToken: "abc123", + expectBody: "foo", + expectStorageClass: "default", + returnStorageClasses: "", + handled: make(chan string, 5), + }, + } arv, _ := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(&arv) + kc, _ := MakeKeepClient(arv) kc.Want_replicas = 2 arv.ApiToken = "abc123" @@ -1245,3 +1498,49 @@ func (s *StandaloneSuite) TestPutBRetry(c *C) { c.Check(hash, Equals, "") c.Check(replicas, Equals, 2) } + +func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) { + arv, err := arvadosclient.MakeArvadosClient() + c.Assert(err, Equals, nil) + + // Add an additional "testblobstore" keepservice + blobKeepService := make(arvadosclient.Dict) + err = arv.Create("keep_services", + arvadosclient.Dict{"keep_service": arvadosclient.Dict{ + "service_host": "localhost", + "service_port": "21321", + "service_type": "testblobstore"}}, + &blobKeepService) + c.Assert(err, Equals, nil) + defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }() + RefreshServiceDiscovery() + + // Make a keepclient and ensure that the testblobstore is included + kc, err := MakeKeepClient(arv) + c.Assert(err, Equals, nil) + + // verify kc.LocalRoots + c.Check(len(kc.LocalRoots()), Equals, 3) + for _, root := range kc.LocalRoots() { + c.Check(root, Matches, "http://localhost:\\d+") + } + c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "") + + // verify kc.GatewayRoots + c.Check(len(kc.GatewayRoots()), Equals, 3) + for _, root := range kc.GatewayRoots() { + c.Check(root, Matches, "http://localhost:\\d+") + } + c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "") + + // verify kc.WritableLocalRoots + c.Check(len(kc.WritableLocalRoots()), Equals, 3) + for _, root := range kc.WritableLocalRoots() { + c.Check(root, Matches, "http://localhost:\\d+") + } + c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "") + + c.Assert(kc.replicasPerService, Equals, 0) + c.Assert(kc.foundNonDiskSvc, Equals, true) + c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second) +}