X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4f5a6df52559b90d2c9412624f3c4c7fbe467579..7ed2f9d875d69b1494372c0cb790b18187dbacf2:/sdk/go/keepclient/collectionreader_test.go diff --git a/sdk/go/keepclient/collectionreader_test.go b/sdk/go/keepclient/collectionreader_test.go index 51710b7bea..58a047c55a 100644 --- a/sdk/go/keepclient/collectionreader_test.go +++ b/sdk/go/keepclient/collectionreader_test.go @@ -2,26 +2,53 @@ package keepclient import ( "crypto/md5" + "crypto/rand" "fmt" "io" "io/ioutil" "net/http" "os" + "strconv" + "strings" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/arvadostest" check "gopkg.in/check.v1" ) -var _ = check.Suite(&IntegrationSuite{}) +var _ = check.Suite(&CollectionReaderUnit{}) -// IntegrationSuite tests need an API server -type IntegrationSuite struct{} +type CollectionReaderUnit struct { + arv arvadosclient.ArvadosClient + kc *KeepClient + handler SuccessHandler +} + +func (s *CollectionReaderUnit) SetUpTest(c *check.C) { + var err error + s.arv, err = arvadosclient.MakeArvadosClient() + c.Assert(err, check.IsNil) + s.arv.ApiToken = arvadostest.ActiveToken + + s.kc, err = MakeKeepClient(&s.arv) + c.Assert(err, check.IsNil) + + s.handler = SuccessHandler{ + disk: make(map[string][]byte), + lock: make(chan struct{}, 1), + ops: new(int), + } + localRoots := make(map[string]string) + for i, k := range RunSomeFakeKeepServers(s.handler, 4) { + localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url + } + s.kc.SetServiceRoots(localRoots, localRoots, nil) +} type SuccessHandler struct { disk map[string][]byte - lock chan struct{} // channel with buffer==1: full when an operation is in progress. - ops *int // number of operations completed + lock chan struct{} // channel with buffer==1: full when an operation is in progress. + ops *int // number of operations completed } func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { @@ -38,7 +65,7 @@ func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { if h.ops != nil { (*h.ops)++ } - <- h.lock + <-h.lock resp.Write([]byte(pdh)) case "GET": pdh := req.URL.Path[1:] @@ -47,7 +74,7 @@ func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { if h.ops != nil { (*h.ops)++ } - <- h.lock + <-h.lock if !ok { resp.WriteHeader(http.StatusNotFound) } else { @@ -64,33 +91,11 @@ type rdrTest struct { want interface{} // error or string to expect } -func StubWithFakeServers(kc *KeepClient, h http.Handler) { - localRoots := make(map[string]string) - for i, k := range RunSomeFakeKeepServers(h, 4) { - localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url - } - kc.SetServiceRoots(localRoots, localRoots, nil) -} - -func (s *ServerRequiredSuite) TestCollectionReaderContent(c *check.C) { - arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, check.IsNil) - arv.ApiToken = arvadostest.ActiveToken - - kc, err := MakeKeepClient(&arv) - c.Assert(err, check.IsNil) - - { - h := SuccessHandler{ - disk: make(map[string][]byte), - lock: make(chan struct{}, 1), - } - StubWithFakeServers(kc, h) - kc.PutB([]byte("foo")) - kc.PutB([]byte("bar")) - kc.PutB([]byte("Hello world\n")) - kc.PutB([]byte("")) - } +func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) { + s.kc.PutB([]byte("foo")) + s.kc.PutB([]byte("bar")) + s.kc.PutB([]byte("Hello world\n")) + s.kc.PutB([]byte("")) mt := arvadostest.PathologicalManifest @@ -116,7 +121,7 @@ func (s *ServerRequiredSuite) TestCollectionReaderContent(c *check.C) { {mt: mt, f: "segmented/frob", want: "frob"}, {mt: mt, f: "segmented/oof", want: "oof"}, } { - rdr, err := kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f) + rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f) switch want := testCase.want.(type) { case error: c.Check(rdr, check.IsNil) @@ -136,21 +141,34 @@ func (s *ServerRequiredSuite) TestCollectionReaderContent(c *check.C) { } } -func (s *ServerRequiredSuite) TestCollectionReaderCloseEarly(c *check.C) { - arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, check.IsNil) - arv.ApiToken = arvadostest.ActiveToken - - kc, err := MakeKeepClient(&arv) - c.Assert(err, check.IsNil) - - h := SuccessHandler{ - disk: make(map[string][]byte), - lock: make(chan struct{}, 1), - ops: new(int), +func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) { + h := md5.New() + buf := make([]byte, 4096) + locs := make([]string, len(buf)) + filesize := 0 + for i := 0; i < len(locs); i++ { + _, err := io.ReadFull(rand.Reader, buf[:i]) + c.Assert(err, check.IsNil) + h.Write(buf[:i]) + locs[i], _, err = s.kc.PutB(buf[:i]) + c.Assert(err, check.IsNil) + filesize += i } - StubWithFakeServers(kc, h) - kc.PutB([]byte("foo")) + manifest := "./random " + strings.Join(locs, " ") + " 0:" + strconv.Itoa(filesize) + ":bytes.bin\n" + dataMD5 := h.Sum(nil) + + checkMD5 := md5.New() + rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "random/bytes.bin") + c.Check(err, check.IsNil) + _, err = io.Copy(checkMD5, rdr) + c.Check(err, check.IsNil) + _, err = rdr.Read(make([]byte, 1)) + c.Check(err, check.Equals, io.EOF) + c.Check(checkMD5.Sum(nil), check.DeepEquals, dataMD5) +} + +func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) { + s.kc.PutB([]byte("foo")) mt := ". " for i := 0; i < 1000; i++ { @@ -161,23 +179,45 @@ func (s *ServerRequiredSuite) TestCollectionReaderCloseEarly(c *check.C) { // Grab the stub server's lock, ensuring our cfReader doesn't // get anything back from its first call to kc.Get() before we // have a chance to call Close(). - h.lock <- struct{}{} - opsBeforeRead := *h.ops + s.handler.lock <- struct{}{} + opsBeforeRead := *s.handler.ops - rdr, err := kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt") + rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt") c.Assert(err, check.IsNil) + + firstReadDone := make(chan struct{}) + go func() { + rdr.Read(make([]byte, 6)) + firstReadDone <- struct{}{} + }() err = rdr.Close() c.Assert(err, check.IsNil) - c.Assert(rdr.Error(), check.IsNil) + c.Assert(rdr.(*cfReader).Error(), check.IsNil) // Release the stub server's lock. The first GET operation will proceed. - <-h.lock + <-s.handler.lock + + // Make sure our first read operation consumes the data + // received from the first GET. + <-firstReadDone // doGet() should close toRead before sending any more bufs to it. - if what, ok := <-rdr.toRead; ok { - c.Errorf("Got %+v, expected toRead to be closed", what) + if what, ok := <-rdr.(*cfReader).toRead; ok { + c.Errorf("Got %q, expected toRead to be closed", what) } // Stub should have handled exactly one GET request. - c.Assert(*h.ops, check.Equals, opsBeforeRead+1) + c.Assert(*s.handler.ops, check.Equals, opsBeforeRead+1) +} + +func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) { + manifest := ". ffffffffffffffffffffffffffffffff+1 0:1:notfound.txt\n" + buf := make([]byte, 1) + rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "notfound.txt") + c.Check(err, check.IsNil) + for i := 0; i < 2; i++ { + _, err = io.ReadFull(rdr, buf) + c.Check(err, check.NotNil) + c.Check(err, check.Not(check.Equals), io.EOF) + } }