7824: Fix upper/lower case ordering on arv-ls.
[arvados.git] / sdk / go / keepclient / collectionreader_test.go
1 package keepclient
2
3 import (
4         "crypto/md5"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "math/rand"
9         "net/http"
10         "os"
11         "strconv"
12         "strings"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
16         check "gopkg.in/check.v1"
17 )
18
19 var _ = check.Suite(&CollectionReaderUnit{})
20
21 type CollectionReaderUnit struct {
22         arv     *arvadosclient.ArvadosClient
23         kc      *KeepClient
24         handler SuccessHandler
25 }
26
27 func (s *CollectionReaderUnit) SetUpTest(c *check.C) {
28         var err error
29         s.arv, err = arvadosclient.MakeArvadosClient()
30         c.Assert(err, check.IsNil)
31         s.arv.ApiToken = arvadostest.ActiveToken
32
33         s.kc, err = MakeKeepClient(s.arv)
34         c.Assert(err, check.IsNil)
35
36         s.handler = SuccessHandler{
37                 disk: make(map[string][]byte),
38                 lock: make(chan struct{}, 1),
39                 ops:  new(int),
40         }
41         localRoots := make(map[string]string)
42         for i, k := range RunSomeFakeKeepServers(s.handler, 4) {
43                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
44         }
45         s.kc.SetServiceRoots(localRoots, localRoots, nil)
46 }
47
48 type SuccessHandler struct {
49         disk map[string][]byte
50         lock chan struct{} // channel with buffer==1: full when an operation is in progress.
51         ops  *int          // number of operations completed
52 }
53
54 func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
55         switch req.Method {
56         case "PUT":
57                 buf, err := ioutil.ReadAll(req.Body)
58                 if err != nil {
59                         resp.WriteHeader(500)
60                         return
61                 }
62                 pdh := fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf))
63                 h.lock <- struct{}{}
64                 h.disk[pdh] = buf
65                 if h.ops != nil {
66                         (*h.ops)++
67                 }
68                 <-h.lock
69                 resp.Write([]byte(pdh))
70         case "GET":
71                 pdh := req.URL.Path[1:]
72                 h.lock <- struct{}{}
73                 buf, ok := h.disk[pdh]
74                 if h.ops != nil {
75                         (*h.ops)++
76                 }
77                 <-h.lock
78                 if !ok {
79                         resp.WriteHeader(http.StatusNotFound)
80                 } else {
81                         resp.Write(buf)
82                 }
83         default:
84                 resp.WriteHeader(http.StatusMethodNotAllowed)
85         }
86 }
87
88 type rdrTest struct {
89         mt   string      // manifest text
90         f    string      // filename
91         want interface{} // error or string to expect
92 }
93
94 func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
95         s.kc.PutB([]byte("foo"))
96         s.kc.PutB([]byte("bar"))
97         s.kc.PutB([]byte("Hello world\n"))
98         s.kc.PutB([]byte(""))
99
100         mt := arvadostest.PathologicalManifest
101
102         for _, testCase := range []rdrTest{
103                 {mt: mt, f: "zzzz", want: os.ErrNotExist},
104                 {mt: mt, f: "frob", want: os.ErrNotExist},
105                 {mt: mt, f: "/segmented/frob", want: "frob"},
106                 {mt: mt, f: "./segmented/frob", want: "frob"},
107                 {mt: mt, f: "/f", want: "f"},
108                 {mt: mt, f: "./f", want: "f"},
109                 {mt: mt, f: "foo bar//baz", want: "foo"},
110                 {mt: mt, f: "foo/zero", want: ""},
111                 {mt: mt, f: "zero@0", want: ""},
112                 {mt: mt, f: "zero@1", want: ""},
113                 {mt: mt, f: "zero@4", want: ""},
114                 {mt: mt, f: "zero@9", want: ""},
115                 {mt: mt, f: "f", want: "f"},
116                 {mt: mt, f: "ooba", want: "ooba"},
117                 {mt: mt, f: "overlapReverse/o", want: "o"},
118                 {mt: mt, f: "overlapReverse/oo", want: "oo"},
119                 {mt: mt, f: "overlapReverse/ofoo", want: "ofoo"},
120                 {mt: mt, f: "foo bar/baz", want: "foo"},
121                 {mt: mt, f: "segmented/frob", want: "frob"},
122                 {mt: mt, f: "segmented/oof", want: "oof"},
123         } {
124                 c.Logf("%#v", testCase)
125                 rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
126                 switch want := testCase.want.(type) {
127                 case error:
128                         c.Check(rdr, check.IsNil)
129                         c.Check(err, check.Equals, want)
130                 case string:
131                         buf := make([]byte, len(want))
132                         n, err := io.ReadFull(rdr, buf)
133                         c.Check(err, check.IsNil)
134                         for i := 0; i < 4; i++ {
135                                 c.Check(string(buf), check.Equals, want)
136                                 n, err = rdr.Read(buf)
137                                 c.Check(n, check.Equals, 0)
138                                 c.Check(err, check.Equals, io.EOF)
139                         }
140
141                         for a := len(want) - 2; a >= 0; a-- {
142                                 for b := a + 1; b <= len(want); b++ {
143                                         offset, err := rdr.Seek(int64(a), io.SeekStart)
144                                         c.Logf("...a=%d, b=%d", a, b)
145                                         c.Check(err, check.IsNil)
146                                         c.Check(offset, check.Equals, int64(a))
147                                         buf := make([]byte, b-a)
148                                         n, err := io.ReadFull(rdr, buf)
149                                         c.Check(n, check.Equals, b-a)
150                                         c.Check(string(buf), check.Equals, want[a:b])
151                                 }
152                         }
153                         offset, err := rdr.Seek(-1, io.SeekStart)
154                         c.Check(err, check.NotNil)
155                         c.Check(offset, check.Equals, int64(len(want)))
156
157                         c.Check(rdr.Close(), check.Equals, nil)
158                 }
159         }
160 }
161
162 func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
163         h := md5.New()
164         var testdata []byte
165         buf := make([]byte, 4096)
166         locs := make([]string, len(buf))
167         filesize := 0
168         for i := 0; i < len(locs); i++ {
169                 _, err := rand.Read(buf[:i])
170                 h.Write(buf[:i])
171                 locs[i], _, err = s.kc.PutB(buf[:i])
172                 c.Assert(err, check.IsNil)
173                 filesize += i
174                 testdata = append(testdata, buf[:i]...)
175         }
176         manifest := "./random " + strings.Join(locs, " ") + " 0:" + strconv.Itoa(filesize) + ":bytes.bin\n"
177         dataMD5 := h.Sum(nil)
178
179         checkMD5 := md5.New()
180         rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "random/bytes.bin")
181         c.Assert(err, check.IsNil)
182         defer rdr.Close()
183
184         _, err = io.Copy(checkMD5, rdr)
185         c.Check(err, check.IsNil)
186         _, err = rdr.Read(make([]byte, 1))
187         c.Check(err, check.Equals, io.EOF)
188         c.Check(checkMD5.Sum(nil), check.DeepEquals, dataMD5)
189
190         size, err := rdr.Seek(0, io.SeekEnd)
191         c.Check(err, check.IsNil)
192         buf = make([]byte, len(testdata))
193         copy(buf, testdata)
194         curPos := size
195         for i := 0; i < 16; i++ {
196                 offset := rand.Intn(len(buf) - 1)
197                 count := rand.Intn(len(buf) - offset)
198                 if rand.Intn(2) == 0 {
199                         curPos, err = rdr.Seek(int64(offset)-curPos, io.SeekCurrent)
200                 } else {
201                         curPos, err = rdr.Seek(int64(offset), io.SeekStart)
202                 }
203                 c.Check(curPos, check.Equals, int64(offset))
204                 for count > 0 {
205                         n, err := rdr.Read(buf[offset : offset+count])
206                         c.Assert(err, check.IsNil)
207                         c.Assert(n > 0, check.Equals, true)
208                         offset += n
209                         count -= n
210                 }
211                 curPos, err = rdr.Seek(0, io.SeekCurrent)
212                 c.Check(curPos, check.Equals, int64(offset))
213         }
214         c.Check(md5.Sum(buf), check.DeepEquals, md5.Sum(testdata))
215         c.Check(buf[:1000], check.DeepEquals, testdata[:1000])
216
217         curPos, err = rdr.Seek(size+12345, io.SeekCurrent)
218         c.Check(err, check.IsNil)
219         c.Check(curPos, check.Equals, size)
220
221         curPos, err = rdr.Seek(8-size, io.SeekCurrent)
222         c.Check(err, check.IsNil)
223         c.Check(curPos, check.Equals, int64(8))
224
225         curPos, err = rdr.Seek(-9, io.SeekCurrent)
226         c.Check(err, check.NotNil)
227         c.Check(curPos, check.Equals, int64(8))
228 }
229
230 func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
231         s.kc.BlockCache = &BlockCache{}
232         s.kc.PutB([]byte("foo"))
233         s.kc.PutB([]byte("bar"))
234         s.kc.PutB([]byte("baz"))
235
236         mt := ". "
237         for i := 0; i < 300; i++ {
238                 mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 73feffa4b7f6bb68e44cf984c85f6e88+3 "
239         }
240         mt += "0:2700:foo900.txt\n"
241
242         // Grab the stub server's lock, ensuring our cfReader doesn't
243         // get anything back from its first call to kc.Get() before we
244         // have a chance to call Close().
245         s.handler.lock <- struct{}{}
246         opsBeforeRead := *s.handler.ops
247
248         rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo900.txt")
249         c.Assert(err, check.IsNil)
250
251         firstReadDone := make(chan struct{})
252         go func() {
253                 n, err := rdr.Read(make([]byte, 3))
254                 c.Check(n, check.Equals, 3)
255                 c.Check(err, check.IsNil)
256                 close(firstReadDone)
257         }()
258
259         // Release the stub server's lock. The first GET operation will proceed.
260         <-s.handler.lock
261
262         // Make sure our first read operation consumes the data
263         // received from the first GET.
264         <-firstReadDone
265
266         err = rdr.Close()
267         c.Check(err, check.IsNil)
268
269         // Stub should have handled exactly one GET request.
270         c.Check(*s.handler.ops, check.Equals, opsBeforeRead+1)
271 }
272
273 func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
274         manifest := ". ffffffffffffffffffffffffffffffff+1 0:1:notfound.txt\n"
275         buf := make([]byte, 1)
276         rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "notfound.txt")
277         c.Check(err, check.IsNil)
278         for i := 0; i < 2; i++ {
279                 _, err = io.ReadFull(rdr, buf)
280                 c.Check(err, check.NotNil)
281                 c.Check(err, check.Not(check.Equals), io.EOF)
282         }
283         c.Check(rdr.Close(), check.IsNil)
284 }