+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package keepclient
import (
"crypto/md5"
- "crypto/rand"
"fmt"
"io"
"io/ioutil"
+ "math/rand"
"net/http"
"os"
"strconv"
var _ = check.Suite(&CollectionReaderUnit{})
type CollectionReaderUnit struct {
- arv arvadosclient.ArvadosClient
+ arv *arvadosclient.ArvadosClient
kc *KeepClient
handler SuccessHandler
}
c.Assert(err, check.IsNil)
s.arv.ApiToken = arvadostest.ActiveToken
- s.kc, err = MakeKeepClient(&s.arv)
+ s.kc, err = MakeKeepClient(s.arv)
c.Assert(err, check.IsNil)
s.handler = SuccessHandler{
for _, testCase := range []rdrTest{
{mt: mt, f: "zzzz", want: os.ErrNotExist},
{mt: mt, f: "frob", want: os.ErrNotExist},
- {mt: mt, f: "/segmented/frob", want: os.ErrNotExist},
- {mt: mt, f: "./segmented/frob", want: os.ErrNotExist},
- {mt: mt, f: "/f", want: os.ErrNotExist},
- {mt: mt, f: "./f", want: os.ErrNotExist},
- {mt: mt, f: "foo bar//baz", want: os.ErrNotExist},
+ {mt: mt, f: "/segmented/frob", want: "frob"},
+ {mt: mt, f: "./segmented/frob", want: "frob"},
+ {mt: mt, f: "/f", want: "f"},
+ {mt: mt, f: "./f", want: "f"},
+ {mt: mt, f: "foo bar//baz", want: "foo"},
{mt: mt, f: "foo/zero", want: ""},
{mt: mt, f: "zero@0", want: ""},
{mt: mt, f: "zero@1", want: ""},
{mt: mt, f: "segmented/frob", want: "frob"},
{mt: mt, f: "segmented/oof", want: "oof"},
} {
+ c.Logf("%#v", testCase)
rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
switch want := testCase.want.(type) {
case error:
c.Check(n, check.Equals, 0)
c.Check(err, check.Equals, io.EOF)
}
+
+ for a := len(want) - 2; a >= 0; a-- {
+ for b := a + 1; b <= len(want); b++ {
+ offset, err := rdr.Seek(int64(a), io.SeekStart)
+ c.Logf("...a=%d, b=%d", a, b)
+ c.Check(err, check.IsNil)
+ c.Check(offset, check.Equals, int64(a))
+ buf := make([]byte, b-a)
+ n, err := io.ReadFull(rdr, buf)
+ c.Check(n, check.Equals, b-a)
+ c.Check(string(buf), check.Equals, want[a:b])
+ }
+ }
+ offset, err := rdr.Seek(-1, io.SeekStart)
+ c.Check(err, check.NotNil)
+ c.Check(offset, check.Equals, int64(len(want)))
+
c.Check(rdr.Close(), check.Equals, nil)
}
}
func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
h := md5.New()
+ var testdata []byte
buf := make([]byte, 4096)
locs := make([]string, len(buf))
filesize := 0
for i := 0; i < len(locs); i++ {
- _, err := io.ReadFull(rand.Reader, buf[:i])
- c.Assert(err, check.IsNil)
+ _, err := rand.Read(buf[:i])
h.Write(buf[:i])
locs[i], _, err = s.kc.PutB(buf[:i])
c.Assert(err, check.IsNil)
filesize += i
+ testdata = append(testdata, buf[:i]...)
}
manifest := "./random " + strings.Join(locs, " ") + " 0:" + strconv.Itoa(filesize) + ":bytes.bin\n"
dataMD5 := h.Sum(nil)
checkMD5 := md5.New()
rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "random/bytes.bin")
- c.Check(err, check.IsNil)
+ c.Assert(err, check.IsNil)
+ defer rdr.Close()
+
_, err = io.Copy(checkMD5, rdr)
c.Check(err, check.IsNil)
_, err = rdr.Read(make([]byte, 1))
c.Check(err, check.Equals, io.EOF)
c.Check(checkMD5.Sum(nil), check.DeepEquals, dataMD5)
+
+ size, err := rdr.Seek(0, io.SeekEnd)
+ c.Check(err, check.IsNil)
+ buf = make([]byte, len(testdata))
+ copy(buf, testdata)
+ curPos := size
+ for i := 0; i < 16; i++ {
+ offset := rand.Intn(len(buf) - 1)
+ count := rand.Intn(len(buf) - offset)
+ if rand.Intn(2) == 0 {
+ curPos, err = rdr.Seek(int64(offset)-curPos, io.SeekCurrent)
+ } else {
+ curPos, err = rdr.Seek(int64(offset), io.SeekStart)
+ }
+ c.Check(curPos, check.Equals, int64(offset))
+ for count > 0 {
+ n, err := rdr.Read(buf[offset : offset+count])
+ c.Assert(err, check.IsNil)
+ c.Assert(n > 0, check.Equals, true)
+ offset += n
+ count -= n
+ }
+ curPos, err = rdr.Seek(0, io.SeekCurrent)
+ c.Check(curPos, check.Equals, int64(offset))
+ }
+ c.Check(md5.Sum(buf), check.DeepEquals, md5.Sum(testdata))
+ c.Check(buf[:1000], check.DeepEquals, testdata[:1000])
+
+ curPos, err = rdr.Seek(size+12345, io.SeekCurrent)
+ c.Check(err, check.IsNil)
+ c.Check(curPos, check.Equals, size)
+
+ curPos, err = rdr.Seek(8-size, io.SeekCurrent)
+ c.Check(err, check.IsNil)
+ c.Check(curPos, check.Equals, int64(8))
+
+ curPos, err = rdr.Seek(-9, io.SeekCurrent)
+ c.Check(err, check.NotNil)
+ c.Check(curPos, check.Equals, int64(8))
}
func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
+ s.kc.BlockCache = &BlockCache{}
s.kc.PutB([]byte("foo"))
+ s.kc.PutB([]byte("bar"))
+ s.kc.PutB([]byte("baz"))
mt := ". "
- for i := 0; i < 1000; i++ {
- mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 "
+ for i := 0; i < 300; i++ {
+ mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 73feffa4b7f6bb68e44cf984c85f6e88+3 "
}
- mt += "0:3000:foo1000.txt\n"
+ mt += "0:2700:foo900.txt\n"
// Grab the stub server's lock, ensuring our cfReader doesn't
// get anything back from its first call to kc.Get() before we
s.handler.lock <- struct{}{}
opsBeforeRead := *s.handler.ops
- rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt")
+ rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo900.txt")
c.Assert(err, check.IsNil)
firstReadDone := make(chan struct{})
go func() {
- rdr.Read(make([]byte, 6))
- firstReadDone <- struct{}{}
+ n, err := rdr.Read(make([]byte, 3))
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+ close(firstReadDone)
}()
- err = rdr.Close()
- c.Assert(err, check.IsNil)
- c.Assert(rdr.(*cfReader).Error(), check.IsNil)
// Release the stub server's lock. The first GET operation will proceed.
<-s.handler.lock
// received from the first GET.
<-firstReadDone
- // doGet() should close toRead before sending any more bufs to it.
- if what, ok := <-rdr.(*cfReader).toRead; ok {
- c.Errorf("Got %q, expected toRead to be closed", what)
- }
+ err = rdr.Close()
+ c.Check(err, check.IsNil)
// Stub should have handled exactly one GET request.
- c.Assert(*s.handler.ops, check.Equals, opsBeforeRead+1)
+ c.Check(*s.handler.ops, check.Equals, opsBeforeRead+1)
}
func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
c.Check(err, check.NotNil)
c.Check(err, check.Not(check.Equals), io.EOF)
}
+ c.Check(rdr.Close(), check.IsNil)
}