20318: Track estimated cache usage, and tidy more diligently.
[arvados.git] / sdk / go / keepclient / collectionreader_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "crypto/md5"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "math/rand"
13         "net/http"
14         "os"
15         "strconv"
16         "strings"
17
18         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
19         "git.arvados.org/arvados.git/sdk/go/arvadostest"
20         check "gopkg.in/check.v1"
21 )
22
23 var _ = check.Suite(&CollectionReaderUnit{})
24
25 type CollectionReaderUnit struct {
26         arv     *arvadosclient.ArvadosClient
27         kc      *KeepClient
28         handler SuccessHandler
29 }
30
31 func (s *CollectionReaderUnit) SetUpTest(c *check.C) {
32         var err error
33         s.arv, err = arvadosclient.MakeArvadosClient()
34         c.Assert(err, check.IsNil)
35         s.arv.ApiToken = arvadostest.ActiveToken
36
37         s.kc, err = MakeKeepClient(s.arv)
38         c.Assert(err, check.IsNil)
39
40         s.handler = SuccessHandler{
41                 disk: make(map[string][]byte),
42                 lock: make(chan struct{}, 1),
43                 ops:  new(int),
44         }
45         localRoots := make(map[string]string)
46         for i, k := range RunSomeFakeKeepServers(s.handler, 4) {
47                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
48         }
49         s.kc.SetServiceRoots(localRoots, localRoots, nil)
50 }
51
52 type SuccessHandler struct {
53         disk map[string][]byte
54         lock chan struct{} // channel with buffer==1: full when an operation is in progress.
55         ops  *int          // number of operations completed
56 }
57
58 func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
59         switch req.Method {
60         case "PUT":
61                 buf, err := ioutil.ReadAll(req.Body)
62                 if err != nil {
63                         resp.WriteHeader(500)
64                         return
65                 }
66                 pdh := fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf))
67                 h.lock <- struct{}{}
68                 h.disk[pdh] = buf
69                 if h.ops != nil {
70                         (*h.ops)++
71                 }
72                 <-h.lock
73                 resp.Write([]byte(pdh))
74         case "GET":
75                 pdh := req.URL.Path[1:]
76                 h.lock <- struct{}{}
77                 buf, ok := h.disk[pdh]
78                 if h.ops != nil {
79                         (*h.ops)++
80                 }
81                 <-h.lock
82                 if !ok {
83                         resp.WriteHeader(http.StatusNotFound)
84                 } else {
85                         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(buf)))
86                         resp.Write(buf)
87                 }
88         default:
89                 resp.WriteHeader(http.StatusMethodNotAllowed)
90         }
91 }
92
93 type rdrTest struct {
94         mt   string      // manifest text
95         f    string      // filename
96         want interface{} // error or string to expect
97 }
98
99 func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
100         s.kc.PutB([]byte("foo"))
101         s.kc.PutB([]byte("bar"))
102         s.kc.PutB([]byte("Hello world\n"))
103         s.kc.PutB([]byte(""))
104
105         mt := arvadostest.PathologicalManifest
106
107         for _, testCase := range []rdrTest{
108                 {mt: mt, f: "zzzz", want: os.ErrNotExist},
109                 {mt: mt, f: "frob", want: os.ErrNotExist},
110                 {mt: mt, f: "/segmented/frob", want: "frob"},
111                 {mt: mt, f: "./segmented/frob", want: "frob"},
112                 {mt: mt, f: "/f", want: "f"},
113                 {mt: mt, f: "./f", want: "f"},
114                 {mt: mt, f: "foo bar//baz", want: "foo"},
115                 {mt: mt, f: "foo/zero", want: ""},
116                 {mt: mt, f: "zero@0", want: ""},
117                 {mt: mt, f: "zero@1", want: ""},
118                 {mt: mt, f: "zero@4", want: ""},
119                 {mt: mt, f: "zero@9", want: ""},
120                 {mt: mt, f: "f", want: "f"},
121                 {mt: mt, f: "ooba", want: "ooba"},
122                 {mt: mt, f: "overlapReverse/o", want: "o"},
123                 {mt: mt, f: "overlapReverse/oo", want: "oo"},
124                 {mt: mt, f: "overlapReverse/ofoo", want: "ofoo"},
125                 {mt: mt, f: "foo bar/baz", want: "foo"},
126                 {mt: mt, f: "segmented/frob", want: "frob"},
127                 {mt: mt, f: "segmented/oof", want: "oof"},
128         } {
129                 c.Logf("%#v", testCase)
130                 rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
131                 switch want := testCase.want.(type) {
132                 case error:
133                         c.Check(rdr, check.IsNil)
134                         c.Check(err, check.Equals, want)
135                 case string:
136                         buf := make([]byte, len(want))
137                         _, err := io.ReadFull(rdr, buf)
138                         c.Check(err, check.IsNil)
139                         for i := 0; i < 4; i++ {
140                                 c.Check(string(buf), check.Equals, want)
141                                 n, err := rdr.Read(buf)
142                                 c.Check(n, check.Equals, 0)
143                                 c.Check(err, check.Equals, io.EOF)
144                         }
145
146                         for a := len(want) - 2; a >= 0; a-- {
147                                 for b := a + 1; b <= len(want); b++ {
148                                         offset, err := rdr.Seek(int64(a), io.SeekStart)
149                                         c.Logf("...a=%d, b=%d", a, b)
150                                         c.Check(err, check.IsNil)
151                                         c.Check(offset, check.Equals, int64(a))
152                                         buf := make([]byte, b-a)
153                                         n, err := io.ReadFull(rdr, buf)
154                                         c.Check(err, check.IsNil)
155                                         c.Check(n, check.Equals, b-a)
156                                         c.Check(string(buf), check.Equals, want[a:b])
157                                 }
158                         }
159                         offset, err := rdr.Seek(-1, io.SeekStart)
160                         c.Check(err, check.NotNil)
161                         c.Check(offset, check.Equals, int64(len(want)))
162
163                         c.Check(rdr.Close(), check.Equals, nil)
164                 }
165         }
166 }
167
168 func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
169         h := md5.New()
170         buf := make([]byte, 4096)
171         locs := make([]string, len(buf))
172         testdata := make([]byte, 0, len(buf)*len(buf))
173         filesize := 0
174         for i := range locs {
175                 _, err := rand.Read(buf[:i])
176                 c.Assert(err, check.IsNil)
177                 h.Write(buf[:i])
178                 locs[i], _, err = s.kc.PutB(buf[:i])
179                 c.Assert(err, check.IsNil)
180                 filesize += i
181                 testdata = append(testdata, buf[:i]...)
182         }
183         manifest := "./random " + strings.Join(locs, " ") + " 0:" + strconv.Itoa(filesize) + ":bytes.bin\n"
184         dataMD5 := h.Sum(nil)
185
186         checkMD5 := md5.New()
187         rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "random/bytes.bin")
188         c.Assert(err, check.IsNil)
189         defer rdr.Close()
190
191         _, err = io.Copy(checkMD5, rdr)
192         c.Check(err, check.IsNil)
193         _, err = rdr.Read(make([]byte, 1))
194         c.Check(err, check.Equals, io.EOF)
195         c.Check(checkMD5.Sum(nil), check.DeepEquals, dataMD5)
196
197         size, err := rdr.Seek(0, io.SeekEnd)
198         c.Check(err, check.IsNil)
199         buf = make([]byte, len(testdata))
200         copy(buf, testdata)
201         curPos := size
202         for i := 0; i < 16; i++ {
203                 offset := rand.Intn(len(buf) - 1)
204                 count := rand.Intn(len(buf) - offset)
205                 if rand.Intn(2) == 0 {
206                         curPos, _ = rdr.Seek(int64(offset)-curPos, io.SeekCurrent)
207                 } else {
208                         curPos, _ = rdr.Seek(int64(offset), io.SeekStart)
209                 }
210                 c.Check(curPos, check.Equals, int64(offset))
211                 for count > 0 {
212                         n, err := rdr.Read(buf[offset : offset+count])
213                         c.Assert(err, check.IsNil)
214                         c.Assert(n > 0, check.Equals, true)
215                         offset += n
216                         count -= n
217                 }
218                 curPos, err = rdr.Seek(0, io.SeekCurrent)
219                 c.Check(err, check.IsNil)
220                 c.Check(curPos, check.Equals, int64(offset))
221         }
222         c.Check(md5.Sum(buf), check.DeepEquals, md5.Sum(testdata))
223         c.Check(buf[:1000], check.DeepEquals, testdata[:1000])
224
225         expectPos := curPos + size + 12345
226         curPos, err = rdr.Seek(size+12345, io.SeekCurrent)
227         c.Check(err, check.IsNil)
228         c.Check(curPos, check.Equals, expectPos)
229
230         curPos, err = rdr.Seek(8-curPos, io.SeekCurrent)
231         c.Check(err, check.IsNil)
232         c.Check(curPos, check.Equals, int64(8))
233
234         curPos, err = rdr.Seek(-9, io.SeekCurrent)
235         c.Check(err, check.NotNil)
236         c.Check(curPos, check.Equals, int64(8))
237 }
238
239 func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
240         // Disable cache
241         s.kc.gatewayStack = &keepViaHTTP{s.kc}
242
243         s.kc.PutB([]byte("foo"))
244         s.kc.PutB([]byte("bar"))
245         s.kc.PutB([]byte("baz"))
246
247         mt := ". "
248         for i := 0; i < 300; i++ {
249                 mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 73feffa4b7f6bb68e44cf984c85f6e88+3 "
250         }
251         mt += "0:2700:foo900.txt\n"
252
253         // Grab the stub server's lock, ensuring our cfReader doesn't
254         // get anything back from its first call to kc.Get() before we
255         // have a chance to call Close().
256         s.handler.lock <- struct{}{}
257         opsBeforeRead := *s.handler.ops
258
259         rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo900.txt")
260         c.Assert(err, check.IsNil)
261
262         firstReadDone := make(chan struct{})
263         go func() {
264                 n, err := rdr.Read(make([]byte, 3))
265                 c.Check(n, check.Equals, 3)
266                 c.Check(err, check.IsNil)
267                 close(firstReadDone)
268         }()
269
270         // Release the stub server's lock. The first GET operation will proceed.
271         <-s.handler.lock
272
273         // Make sure our first read operation consumes the data
274         // received from the first GET.
275         <-firstReadDone
276
277         err = rdr.Close()
278         c.Check(err, check.IsNil)
279
280         // Stub should have handled exactly one GET request.
281         c.Check(*s.handler.ops, check.Equals, opsBeforeRead+1)
282 }
283
284 func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
285         manifest := ". ffffffffffffffffffffffffffffffff+1 0:1:notfound.txt\n"
286         buf := make([]byte, 1)
287         rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "notfound.txt")
288         c.Check(err, check.IsNil)
289         for i := 0; i < 2; i++ {
290                 _, err = io.ReadFull(rdr, buf)
291                 c.Check(err, check.NotNil)
292                 c.Check(err, check.Not(check.Equals), io.EOF)
293         }
294         c.Check(rdr.Close(), check.IsNil)
295 }