Merge branch '8784-dir-listings'
[arvados.git] / sdk / go / keepclient / collectionreader_test.go
index 94e41e2bc2d69e50f4e10502aae75c22ef5574b8..813a335df7f5033305b9e4041a57023d6e021197 100644 (file)
@@ -1,11 +1,15 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 package keepclient
 
 import (
        "crypto/md5"
-       "crypto/rand"
        "fmt"
        "io"
        "io/ioutil"
+       "math/rand"
        "net/http"
        "os"
        "strconv"
@@ -19,7 +23,7 @@ import (
 var _ = check.Suite(&CollectionReaderUnit{})
 
 type CollectionReaderUnit struct {
-       arv     arvadosclient.ArvadosClient
+       arv     *arvadosclient.ArvadosClient
        kc      *KeepClient
        handler SuccessHandler
 }
@@ -30,13 +34,13 @@ func (s *CollectionReaderUnit) SetUpTest(c *check.C) {
        c.Assert(err, check.IsNil)
        s.arv.ApiToken = arvadostest.ActiveToken
 
-       s.kc, err = MakeKeepClient(&s.arv)
+       s.kc, err = MakeKeepClient(s.arv)
        c.Assert(err, check.IsNil)
 
        s.handler = SuccessHandler{
                disk: make(map[string][]byte),
                lock: make(chan struct{}, 1),
-               ops: new(int),
+               ops:  new(int),
        }
        localRoots := make(map[string]string)
        for i, k := range RunSomeFakeKeepServers(s.handler, 4) {
@@ -47,8 +51,8 @@ func (s *CollectionReaderUnit) SetUpTest(c *check.C) {
 
 type SuccessHandler struct {
        disk map[string][]byte
-       lock chan struct{}      // channel with buffer==1: full when an operation is in progress.
-       ops  *int               // number of operations completed
+       lock chan struct{} // channel with buffer==1: full when an operation is in progress.
+       ops  *int          // number of operations completed
 }
 
 func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
@@ -65,7 +69,7 @@ func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
                if h.ops != nil {
                        (*h.ops)++
                }
-               <- h.lock
+               <-h.lock
                resp.Write([]byte(pdh))
        case "GET":
                pdh := req.URL.Path[1:]
@@ -74,7 +78,7 @@ func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
                if h.ops != nil {
                        (*h.ops)++
                }
-               <- h.lock
+               <-h.lock
                if !ok {
                        resp.WriteHeader(http.StatusNotFound)
                } else {
@@ -102,11 +106,11 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
        for _, testCase := range []rdrTest{
                {mt: mt, f: "zzzz", want: os.ErrNotExist},
                {mt: mt, f: "frob", want: os.ErrNotExist},
-               {mt: mt, f: "/segmented/frob", want: os.ErrNotExist},
-               {mt: mt, f: "./segmented/frob", want: os.ErrNotExist},
-               {mt: mt, f: "/f", want: os.ErrNotExist},
-               {mt: mt, f: "./f", want: os.ErrNotExist},
-               {mt: mt, f: "foo bar//baz", want: os.ErrNotExist},
+               {mt: mt, f: "/segmented/frob", want: "frob"},
+               {mt: mt, f: "./segmented/frob", want: "frob"},
+               {mt: mt, f: "/f", want: "f"},
+               {mt: mt, f: "./f", want: "f"},
+               {mt: mt, f: "foo bar//baz", want: "foo"},
                {mt: mt, f: "foo/zero", want: ""},
                {mt: mt, f: "zero@0", want: ""},
                {mt: mt, f: "zero@1", want: ""},
@@ -121,6 +125,7 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
                {mt: mt, f: "segmented/frob", want: "frob"},
                {mt: mt, f: "segmented/oof", want: "oof"},
        } {
+               c.Logf("%#v", testCase)
                rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
                switch want := testCase.want.(type) {
                case error:
@@ -136,6 +141,23 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
                                c.Check(n, check.Equals, 0)
                                c.Check(err, check.Equals, io.EOF)
                        }
+
+                       for a := len(want) - 2; a >= 0; a-- {
+                               for b := a + 1; b <= len(want); b++ {
+                                       offset, err := rdr.Seek(int64(a), io.SeekStart)
+                                       c.Logf("...a=%d, b=%d", a, b)
+                                       c.Check(err, check.IsNil)
+                                       c.Check(offset, check.Equals, int64(a))
+                                       buf := make([]byte, b-a)
+                                       n, err := io.ReadFull(rdr, buf)
+                                       c.Check(n, check.Equals, b-a)
+                                       c.Check(string(buf), check.Equals, want[a:b])
+                               }
+                       }
+                       offset, err := rdr.Seek(-1, io.SeekStart)
+                       c.Check(err, check.NotNil)
+                       c.Check(offset, check.Equals, int64(len(want)))
+
                        c.Check(rdr.Close(), check.Equals, nil)
                }
        }
@@ -143,38 +165,83 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
 
 func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
        h := md5.New()
+       var testdata []byte
        buf := make([]byte, 4096)
        locs := make([]string, len(buf))
        filesize := 0
        for i := 0; i < len(locs); i++ {
-               _, err := io.ReadFull(rand.Reader, buf[:i])
-               c.Assert(err, check.IsNil)
+               _, err := rand.Read(buf[:i])
                h.Write(buf[:i])
                locs[i], _, err = s.kc.PutB(buf[:i])
                c.Assert(err, check.IsNil)
                filesize += i
+               testdata = append(testdata, buf[:i]...)
        }
        manifest := "./random " + strings.Join(locs, " ") + " 0:" + strconv.Itoa(filesize) + ":bytes.bin\n"
        dataMD5 := h.Sum(nil)
 
        checkMD5 := md5.New()
        rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "random/bytes.bin")
-       c.Check(err, check.IsNil)
+       c.Assert(err, check.IsNil)
+       defer rdr.Close()
+
        _, err = io.Copy(checkMD5, rdr)
        c.Check(err, check.IsNil)
        _, err = rdr.Read(make([]byte, 1))
        c.Check(err, check.Equals, io.EOF)
        c.Check(checkMD5.Sum(nil), check.DeepEquals, dataMD5)
+
+       size, err := rdr.Seek(0, io.SeekEnd)
+       c.Check(err, check.IsNil)
+       buf = make([]byte, len(testdata))
+       copy(buf, testdata)
+       curPos := size
+       for i := 0; i < 16; i++ {
+               offset := rand.Intn(len(buf) - 1)
+               count := rand.Intn(len(buf) - offset)
+               if rand.Intn(2) == 0 {
+                       curPos, err = rdr.Seek(int64(offset)-curPos, io.SeekCurrent)
+               } else {
+                       curPos, err = rdr.Seek(int64(offset), io.SeekStart)
+               }
+               c.Check(curPos, check.Equals, int64(offset))
+               for count > 0 {
+                       n, err := rdr.Read(buf[offset : offset+count])
+                       c.Assert(err, check.IsNil)
+                       c.Assert(n > 0, check.Equals, true)
+                       offset += n
+                       count -= n
+               }
+               curPos, err = rdr.Seek(0, io.SeekCurrent)
+               c.Check(curPos, check.Equals, int64(offset))
+       }
+       c.Check(md5.Sum(buf), check.DeepEquals, md5.Sum(testdata))
+       c.Check(buf[:1000], check.DeepEquals, testdata[:1000])
+
+       curPos, err = rdr.Seek(size+12345, io.SeekCurrent)
+       c.Check(err, check.IsNil)
+       c.Check(curPos, check.Equals, size)
+
+       curPos, err = rdr.Seek(8-size, io.SeekCurrent)
+       c.Check(err, check.IsNil)
+       c.Check(curPos, check.Equals, int64(8))
+
+       curPos, err = rdr.Seek(-9, io.SeekCurrent)
+       c.Check(err, check.NotNil)
+       c.Check(curPos, check.Equals, int64(8))
 }
 
 func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
+       s.kc.BlockCache = &BlockCache{}
        s.kc.PutB([]byte("foo"))
+       s.kc.PutB([]byte("bar"))
+       s.kc.PutB([]byte("baz"))
 
        mt := ". "
-       for i := 0; i < 1000; i++ {
-               mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 "
+       for i := 0; i < 300; i++ {
+               mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 73feffa4b7f6bb68e44cf984c85f6e88+3 "
        }
-       mt += "0:3000:foo1000.txt\n"
+       mt += "0:2700:foo900.txt\n"
 
        // Grab the stub server's lock, ensuring our cfReader doesn't
        // get anything back from its first call to kc.Get() before we
@@ -182,17 +249,16 @@ func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
        s.handler.lock <- struct{}{}
        opsBeforeRead := *s.handler.ops
 
-       rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt")
+       rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo900.txt")
        c.Assert(err, check.IsNil)
 
        firstReadDone := make(chan struct{})
        go func() {
-               rdr.Read(make([]byte, 6))
-               firstReadDone <- struct{}{}
+               n, err := rdr.Read(make([]byte, 3))
+               c.Check(n, check.Equals, 3)
+               c.Check(err, check.IsNil)
+               close(firstReadDone)
        }()
-       err = rdr.Close()
-       c.Assert(err, check.IsNil)
-       c.Assert(rdr.Error(), check.IsNil)
 
        // Release the stub server's lock. The first GET operation will proceed.
        <-s.handler.lock
@@ -201,13 +267,11 @@ func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
        // received from the first GET.
        <-firstReadDone
 
-       // doGet() should close toRead before sending any more bufs to it.
-       if what, ok := <-rdr.toRead; ok {
-               c.Errorf("Got %q, expected toRead to be closed", string(what))
-       }
+       err = rdr.Close()
+       c.Check(err, check.IsNil)
 
        // Stub should have handled exactly one GET request.
-       c.Assert(*s.handler.ops, check.Equals, opsBeforeRead+1)
+       c.Check(*s.handler.ops, check.Equals, opsBeforeRead+1)
 }
 
 func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
@@ -217,7 +281,8 @@ func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
        c.Check(err, check.IsNil)
        for i := 0; i < 2; i++ {
                _, err = io.ReadFull(rdr, buf)
-               c.Check(err, check.Not(check.IsNil))
+               c.Check(err, check.NotNil)
                c.Check(err, check.Not(check.Equals), io.EOF)
        }
+       c.Check(rdr.Close(), check.IsNil)
 }