1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
18 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
19 "git.arvados.org/arvados.git/sdk/go/arvadostest"
20 check "gopkg.in/check.v1"
23 var _ = check.Suite(&CollectionReaderUnit{})
25 type CollectionReaderUnit struct {
26 arv *arvadosclient.ArvadosClient
28 handler SuccessHandler
31 func (s *CollectionReaderUnit) SetUpTest(c *check.C) {
33 s.arv, err = arvadosclient.MakeArvadosClient()
34 c.Assert(err, check.IsNil)
35 s.arv.ApiToken = arvadostest.ActiveToken
37 s.kc, err = MakeKeepClient(s.arv)
38 c.Assert(err, check.IsNil)
40 s.handler = SuccessHandler{
41 disk: make(map[string][]byte),
42 lock: make(chan struct{}, 1),
45 localRoots := make(map[string]string)
46 for i, k := range RunSomeFakeKeepServers(s.handler, 4) {
47 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
49 s.kc.SetServiceRoots(localRoots, localRoots, nil)
52 type SuccessHandler struct {
53 disk map[string][]byte
54 lock chan struct{} // channel with buffer==1: full when an operation is in progress.
55 ops *int // number of operations completed
58 func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
61 buf, err := ioutil.ReadAll(req.Body)
66 pdh := fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf))
73 resp.Write([]byte(pdh))
75 pdh := req.URL.Path[1:]
77 buf, ok := h.disk[pdh]
83 resp.WriteHeader(http.StatusNotFound)
85 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(buf)))
89 resp.WriteHeader(http.StatusMethodNotAllowed)
94 mt string // manifest text
96 want interface{} // error or string to expect
99 func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
100 s.kc.PutB([]byte("foo"))
101 s.kc.PutB([]byte("bar"))
102 s.kc.PutB([]byte("Hello world\n"))
103 s.kc.PutB([]byte(""))
105 mt := arvadostest.PathologicalManifest
107 for _, testCase := range []rdrTest{
108 {mt: mt, f: "zzzz", want: os.ErrNotExist},
109 {mt: mt, f: "frob", want: os.ErrNotExist},
110 {mt: mt, f: "/segmented/frob", want: "frob"},
111 {mt: mt, f: "./segmented/frob", want: "frob"},
112 {mt: mt, f: "/f", want: "f"},
113 {mt: mt, f: "./f", want: "f"},
114 {mt: mt, f: "foo bar//baz", want: "foo"},
115 {mt: mt, f: "foo/zero", want: ""},
116 {mt: mt, f: "zero@0", want: ""},
117 {mt: mt, f: "zero@1", want: ""},
118 {mt: mt, f: "zero@4", want: ""},
119 {mt: mt, f: "zero@9", want: ""},
120 {mt: mt, f: "f", want: "f"},
121 {mt: mt, f: "ooba", want: "ooba"},
122 {mt: mt, f: "overlapReverse/o", want: "o"},
123 {mt: mt, f: "overlapReverse/oo", want: "oo"},
124 {mt: mt, f: "overlapReverse/ofoo", want: "ofoo"},
125 {mt: mt, f: "foo bar/baz", want: "foo"},
126 {mt: mt, f: "segmented/frob", want: "frob"},
127 {mt: mt, f: "segmented/oof", want: "oof"},
129 c.Logf("%#v", testCase)
130 rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
131 switch want := testCase.want.(type) {
133 c.Check(rdr, check.IsNil)
134 c.Check(err, check.Equals, want)
136 buf := make([]byte, len(want))
137 _, err := io.ReadFull(rdr, buf)
138 c.Check(err, check.IsNil)
139 for i := 0; i < 4; i++ {
140 c.Check(string(buf), check.Equals, want)
141 n, err := rdr.Read(buf)
142 c.Check(n, check.Equals, 0)
143 c.Check(err, check.Equals, io.EOF)
146 for a := len(want) - 2; a >= 0; a-- {
147 for b := a + 1; b <= len(want); b++ {
148 offset, err := rdr.Seek(int64(a), io.SeekStart)
149 c.Logf("...a=%d, b=%d", a, b)
150 c.Check(err, check.IsNil)
151 c.Check(offset, check.Equals, int64(a))
152 buf := make([]byte, b-a)
153 n, err := io.ReadFull(rdr, buf)
154 c.Check(err, check.IsNil)
155 c.Check(n, check.Equals, b-a)
156 c.Check(string(buf), check.Equals, want[a:b])
159 offset, err := rdr.Seek(-1, io.SeekStart)
160 c.Check(err, check.NotNil)
161 c.Check(offset, check.Equals, int64(len(want)))
163 c.Check(rdr.Close(), check.Equals, nil)
168 func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
170 buf := make([]byte, 4096)
171 locs := make([]string, len(buf))
172 testdata := make([]byte, 0, len(buf)*len(buf))
174 for i := range locs {
175 _, err := rand.Read(buf[:i])
176 c.Assert(err, check.IsNil)
178 locs[i], _, err = s.kc.PutB(buf[:i])
179 c.Assert(err, check.IsNil)
181 testdata = append(testdata, buf[:i]...)
183 manifest := "./random " + strings.Join(locs, " ") + " 0:" + strconv.Itoa(filesize) + ":bytes.bin\n"
184 dataMD5 := h.Sum(nil)
186 checkMD5 := md5.New()
187 rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "random/bytes.bin")
188 c.Assert(err, check.IsNil)
191 _, err = io.Copy(checkMD5, rdr)
192 c.Check(err, check.IsNil)
193 _, err = rdr.Read(make([]byte, 1))
194 c.Check(err, check.Equals, io.EOF)
195 c.Check(checkMD5.Sum(nil), check.DeepEquals, dataMD5)
197 size, err := rdr.Seek(0, io.SeekEnd)
198 c.Check(err, check.IsNil)
199 buf = make([]byte, len(testdata))
202 for i := 0; i < 16; i++ {
203 offset := rand.Intn(len(buf) - 1)
204 count := rand.Intn(len(buf) - offset)
205 if rand.Intn(2) == 0 {
206 curPos, _ = rdr.Seek(int64(offset)-curPos, io.SeekCurrent)
208 curPos, _ = rdr.Seek(int64(offset), io.SeekStart)
210 c.Check(curPos, check.Equals, int64(offset))
212 n, err := rdr.Read(buf[offset : offset+count])
213 c.Assert(err, check.IsNil)
214 c.Assert(n > 0, check.Equals, true)
218 curPos, err = rdr.Seek(0, io.SeekCurrent)
219 c.Check(err, check.IsNil)
220 c.Check(curPos, check.Equals, int64(offset))
222 c.Check(md5.Sum(buf), check.DeepEquals, md5.Sum(testdata))
223 c.Check(buf[:1000], check.DeepEquals, testdata[:1000])
225 expectPos := curPos + size + 12345
226 curPos, err = rdr.Seek(size+12345, io.SeekCurrent)
227 c.Check(err, check.IsNil)
228 c.Check(curPos, check.Equals, expectPos)
230 curPos, err = rdr.Seek(8-curPos, io.SeekCurrent)
231 c.Check(err, check.IsNil)
232 c.Check(curPos, check.Equals, int64(8))
234 curPos, err = rdr.Seek(-9, io.SeekCurrent)
235 c.Check(err, check.NotNil)
236 c.Check(curPos, check.Equals, int64(8))
239 func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
241 s.kc.gatewayStack = &keepViaHTTP{s.kc}
243 s.kc.PutB([]byte("foo"))
244 s.kc.PutB([]byte("bar"))
245 s.kc.PutB([]byte("baz"))
248 for i := 0; i < 300; i++ {
249 mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 73feffa4b7f6bb68e44cf984c85f6e88+3 "
251 mt += "0:2700:foo900.txt\n"
253 // Grab the stub server's lock, ensuring our cfReader doesn't
254 // get anything back from its first call to kc.Get() before we
255 // have a chance to call Close().
256 s.handler.lock <- struct{}{}
257 opsBeforeRead := *s.handler.ops
259 rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo900.txt")
260 c.Assert(err, check.IsNil)
262 firstReadDone := make(chan struct{})
264 n, err := rdr.Read(make([]byte, 3))
265 c.Check(n, check.Equals, 3)
266 c.Check(err, check.IsNil)
270 // Release the stub server's lock. The first GET operation will proceed.
273 // Make sure our first read operation consumes the data
274 // received from the first GET.
278 c.Check(err, check.IsNil)
280 // Stub should have handled exactly one GET request.
281 c.Check(*s.handler.ops, check.Equals, opsBeforeRead+1)
284 func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
285 manifest := ". ffffffffffffffffffffffffffffffff+1 0:1:notfound.txt\n"
286 buf := make([]byte, 1)
287 rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "notfound.txt")
288 c.Check(err, check.IsNil)
289 for i := 0; i < 2; i++ {
290 _, err = io.ReadFull(rdr, buf)
291 c.Check(err, check.NotNil)
292 c.Check(err, check.Not(check.Equals), io.EOF)
294 c.Check(rdr.Close(), check.IsNil)