X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7314917d65573b0e9d55f7b6522463c470356fba..45c611003d84157370f7356b62ba8aa6972535a1:/sdk/go/keepclient/collectionreader_test.go diff --git a/sdk/go/keepclient/collectionreader_test.go b/sdk/go/keepclient/collectionreader_test.go index be4f386ff2..75603f1baa 100644 --- a/sdk/go/keepclient/collectionreader_test.go +++ b/sdk/go/keepclient/collectionreader_test.go @@ -1,18 +1,22 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + package keepclient import ( "crypto/md5" - "crypto/rand" "fmt" "io" "io/ioutil" + "math/rand" "net/http" "os" "strconv" "strings" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/arvadostest" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/arvadostest" check "gopkg.in/check.v1" ) @@ -78,6 +82,7 @@ func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { if !ok { resp.WriteHeader(http.StatusNotFound) } else { + resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(buf))) resp.Write(buf) } default: @@ -102,11 +107,11 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) { for _, testCase := range []rdrTest{ {mt: mt, f: "zzzz", want: os.ErrNotExist}, {mt: mt, f: "frob", want: os.ErrNotExist}, - {mt: mt, f: "/segmented/frob", want: os.ErrNotExist}, - {mt: mt, f: "./segmented/frob", want: os.ErrNotExist}, - {mt: mt, f: "/f", want: os.ErrNotExist}, - {mt: mt, f: "./f", want: os.ErrNotExist}, - {mt: mt, f: "foo bar//baz", want: os.ErrNotExist}, + {mt: mt, f: "/segmented/frob", want: "frob"}, + {mt: mt, f: "./segmented/frob", want: "frob"}, + {mt: mt, f: "/f", want: "f"}, + {mt: mt, f: "./f", want: "f"}, + {mt: mt, f: "foo bar//baz", want: "foo"}, {mt: mt, f: "foo/zero", want: ""}, {mt: mt, f: "zero@0", want: ""}, {mt: mt, f: "zero@1", want: ""}, @@ -121,6 +126,7 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) { {mt: mt, f: "segmented/frob", want: "frob"}, {mt: mt, f: "segmented/oof", want: "oof"}, } { + c.Logf("%#v", testCase) rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f) switch want := testCase.want.(type) { case error: @@ -128,14 +134,32 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) { c.Check(err, check.Equals, want) case string: buf := make([]byte, len(want)) - n, err := io.ReadFull(rdr, buf) + _, err := io.ReadFull(rdr, buf) c.Check(err, check.IsNil) for i := 0; i < 4; i++ { c.Check(string(buf), check.Equals, want) - n, err = rdr.Read(buf) + n, err := rdr.Read(buf) c.Check(n, check.Equals, 0) c.Check(err, check.Equals, io.EOF) } + + for a := len(want) - 2; a >= 0; a-- { + for b := a + 1; b <= len(want); b++ { + offset, err := rdr.Seek(int64(a), io.SeekStart) + c.Logf("...a=%d, b=%d", a, b) + c.Check(err, check.IsNil) + c.Check(offset, check.Equals, int64(a)) + buf := make([]byte, b-a) + n, err := io.ReadFull(rdr, buf) + c.Check(err, check.IsNil) + c.Check(n, check.Equals, b-a) + c.Check(string(buf), check.Equals, want[a:b]) + } + } + offset, err := rdr.Seek(-1, io.SeekStart) + c.Check(err, check.NotNil) + c.Check(offset, check.Equals, int64(len(want))) + c.Check(rdr.Close(), check.Equals, nil) } } @@ -145,36 +169,84 @@ func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) { h := md5.New() buf := make([]byte, 4096) locs := make([]string, len(buf)) + testdata := make([]byte, 0, len(buf)*len(buf)) filesize := 0 - for i := 0; i < len(locs); i++ { - _, err := io.ReadFull(rand.Reader, buf[:i]) + for i := range locs { + _, err := rand.Read(buf[:i]) c.Assert(err, check.IsNil) h.Write(buf[:i]) locs[i], _, err = s.kc.PutB(buf[:i]) c.Assert(err, check.IsNil) filesize += i + testdata = append(testdata, buf[:i]...) } manifest := "./random " + strings.Join(locs, " ") + " 0:" + strconv.Itoa(filesize) + ":bytes.bin\n" dataMD5 := h.Sum(nil) checkMD5 := md5.New() rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "random/bytes.bin") - c.Check(err, check.IsNil) + c.Assert(err, check.IsNil) + defer rdr.Close() + _, err = io.Copy(checkMD5, rdr) c.Check(err, check.IsNil) _, err = rdr.Read(make([]byte, 1)) c.Check(err, check.Equals, io.EOF) c.Check(checkMD5.Sum(nil), check.DeepEquals, dataMD5) + + size, err := rdr.Seek(0, io.SeekEnd) + c.Check(err, check.IsNil) + buf = make([]byte, len(testdata)) + copy(buf, testdata) + curPos := size + for i := 0; i < 16; i++ { + offset := rand.Intn(len(buf) - 1) + count := rand.Intn(len(buf) - offset) + if rand.Intn(2) == 0 { + curPos, _ = rdr.Seek(int64(offset)-curPos, io.SeekCurrent) + } else { + curPos, _ = rdr.Seek(int64(offset), io.SeekStart) + } + c.Check(curPos, check.Equals, int64(offset)) + for count > 0 { + n, err := rdr.Read(buf[offset : offset+count]) + c.Assert(err, check.IsNil) + c.Assert(n > 0, check.Equals, true) + offset += n + count -= n + } + curPos, err = rdr.Seek(0, io.SeekCurrent) + c.Check(err, check.IsNil) + c.Check(curPos, check.Equals, int64(offset)) + } + c.Check(md5.Sum(buf), check.DeepEquals, md5.Sum(testdata)) + c.Check(buf[:1000], check.DeepEquals, testdata[:1000]) + + expectPos := curPos + size + 12345 + curPos, err = rdr.Seek(size+12345, io.SeekCurrent) + c.Check(err, check.IsNil) + c.Check(curPos, check.Equals, expectPos) + + curPos, err = rdr.Seek(8-curPos, io.SeekCurrent) + c.Check(err, check.IsNil) + c.Check(curPos, check.Equals, int64(8)) + + curPos, err = rdr.Seek(-9, io.SeekCurrent) + c.Check(err, check.NotNil) + c.Check(curPos, check.Equals, int64(8)) } func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) { + s.kc.BlockCache = &BlockCache{} s.kc.PutB([]byte("foo")) + s.kc.PutB([]byte("bar")) + s.kc.PutB([]byte("baz")) mt := ". " - for i := 0; i < 1000; i++ { - mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 " + for i := 0; i < 300; i++ { + mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 73feffa4b7f6bb68e44cf984c85f6e88+3 " } - mt += "0:3000:foo1000.txt\n" + mt += "0:2700:foo900.txt\n" // Grab the stub server's lock, ensuring our cfReader doesn't // get anything back from its first call to kc.Get() before we @@ -182,17 +254,16 @@ func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) { s.handler.lock <- struct{}{} opsBeforeRead := *s.handler.ops - rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt") + rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo900.txt") c.Assert(err, check.IsNil) firstReadDone := make(chan struct{}) go func() { - rdr.Read(make([]byte, 6)) - firstReadDone <- struct{}{} + n, err := rdr.Read(make([]byte, 3)) + c.Check(n, check.Equals, 3) + c.Check(err, check.IsNil) + close(firstReadDone) }() - err = rdr.Close() - c.Assert(err, check.IsNil) - c.Assert(rdr.(*cfReader).Error(), check.IsNil) // Release the stub server's lock. The first GET operation will proceed. <-s.handler.lock @@ -201,13 +272,11 @@ func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) { // received from the first GET. <-firstReadDone - // doGet() should close toRead before sending any more bufs to it. - if what, ok := <-rdr.(*cfReader).toRead; ok { - c.Errorf("Got %q, expected toRead to be closed", what) - } + err = rdr.Close() + c.Check(err, check.IsNil) // Stub should have handled exactly one GET request. - c.Assert(*s.handler.ops, check.Equals, opsBeforeRead+1) + c.Check(*s.handler.ops, check.Equals, opsBeforeRead+1) } func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) { @@ -220,5 +289,5 @@ func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) { c.Check(err, check.NotNil) c.Check(err, check.Not(check.Equals), io.EOF) } - c.Check(rdr.Close(), check.NotNil) + c.Check(rdr.Close(), check.IsNil) }