14 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
16 check "gopkg.in/check.v1"
19 var _ = check.Suite(&CollectionReaderUnit{})
21 type CollectionReaderUnit struct {
22 arv *arvadosclient.ArvadosClient
24 handler SuccessHandler
27 func (s *CollectionReaderUnit) SetUpTest(c *check.C) {
29 s.arv, err = arvadosclient.MakeArvadosClient()
30 c.Assert(err, check.IsNil)
31 s.arv.ApiToken = arvadostest.ActiveToken
33 s.kc, err = MakeKeepClient(s.arv)
34 c.Assert(err, check.IsNil)
36 s.handler = SuccessHandler{
37 disk: make(map[string][]byte),
38 lock: make(chan struct{}, 1),
41 localRoots := make(map[string]string)
42 for i, k := range RunSomeFakeKeepServers(s.handler, 4) {
43 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
45 s.kc.SetServiceRoots(localRoots, localRoots, nil)
48 type SuccessHandler struct {
49 disk map[string][]byte
50 lock chan struct{} // channel with buffer==1: full when an operation is in progress.
51 ops *int // number of operations completed
54 func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
57 buf, err := ioutil.ReadAll(req.Body)
62 pdh := fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf))
69 resp.Write([]byte(pdh))
71 pdh := req.URL.Path[1:]
73 buf, ok := h.disk[pdh]
79 resp.WriteHeader(http.StatusNotFound)
84 resp.WriteHeader(http.StatusMethodNotAllowed)
89 mt string // manifest text
91 want interface{} // error or string to expect
94 func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
95 s.kc.PutB([]byte("foo"))
96 s.kc.PutB([]byte("bar"))
97 s.kc.PutB([]byte("Hello world\n"))
100 mt := arvadostest.PathologicalManifest
102 for _, testCase := range []rdrTest{
103 {mt: mt, f: "zzzz", want: os.ErrNotExist},
104 {mt: mt, f: "frob", want: os.ErrNotExist},
105 {mt: mt, f: "/segmented/frob", want: os.ErrNotExist},
106 {mt: mt, f: "./segmented/frob", want: os.ErrNotExist},
107 {mt: mt, f: "/f", want: os.ErrNotExist},
108 {mt: mt, f: "./f", want: os.ErrNotExist},
109 {mt: mt, f: "foo bar//baz", want: os.ErrNotExist},
110 {mt: mt, f: "foo/zero", want: ""},
111 {mt: mt, f: "zero@0", want: ""},
112 {mt: mt, f: "zero@1", want: ""},
113 {mt: mt, f: "zero@4", want: ""},
114 {mt: mt, f: "zero@9", want: ""},
115 {mt: mt, f: "f", want: "f"},
116 {mt: mt, f: "ooba", want: "ooba"},
117 {mt: mt, f: "overlapReverse/o", want: "o"},
118 {mt: mt, f: "overlapReverse/oo", want: "oo"},
119 {mt: mt, f: "overlapReverse/ofoo", want: "ofoo"},
120 {mt: mt, f: "foo bar/baz", want: "foo"},
121 {mt: mt, f: "segmented/frob", want: "frob"},
122 {mt: mt, f: "segmented/oof", want: "oof"},
124 rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
125 switch want := testCase.want.(type) {
127 c.Check(rdr, check.IsNil)
128 c.Check(err, check.Equals, want)
130 buf := make([]byte, len(want))
131 n, err := io.ReadFull(rdr, buf)
132 c.Check(err, check.IsNil)
133 for i := 0; i < 4; i++ {
134 c.Check(string(buf), check.Equals, want)
135 n, err = rdr.Read(buf)
136 c.Check(n, check.Equals, 0)
137 c.Check(err, check.Equals, io.EOF)
139 c.Check(rdr.Close(), check.Equals, nil)
144 func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
146 buf := make([]byte, 4096)
147 locs := make([]string, len(buf))
149 for i := 0; i < len(locs); i++ {
150 _, err := io.ReadFull(rand.Reader, buf[:i])
151 c.Assert(err, check.IsNil)
153 locs[i], _, err = s.kc.PutB(buf[:i])
154 c.Assert(err, check.IsNil)
157 manifest := "./random " + strings.Join(locs, " ") + " 0:" + strconv.Itoa(filesize) + ":bytes.bin\n"
158 dataMD5 := h.Sum(nil)
160 checkMD5 := md5.New()
161 rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "random/bytes.bin")
162 c.Check(err, check.IsNil)
163 _, err = io.Copy(checkMD5, rdr)
164 c.Check(err, check.IsNil)
165 _, err = rdr.Read(make([]byte, 1))
166 c.Check(err, check.Equals, io.EOF)
167 c.Check(checkMD5.Sum(nil), check.DeepEquals, dataMD5)
170 func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
171 s.kc.PutB([]byte("foo"))
174 for i := 0; i < 1000; i++ {
175 mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 "
177 mt += "0:3000:foo1000.txt\n"
179 // Grab the stub server's lock, ensuring our cfReader doesn't
180 // get anything back from its first call to kc.Get() before we
181 // have a chance to call Close().
182 s.handler.lock <- struct{}{}
183 opsBeforeRead := *s.handler.ops
185 rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt")
186 c.Assert(err, check.IsNil)
188 firstReadDone := make(chan struct{})
190 rdr.Read(make([]byte, 6))
191 firstReadDone <- struct{}{}
194 c.Assert(err, check.IsNil)
195 c.Assert(rdr.(*cfReader).Error(), check.IsNil)
197 // Release the stub server's lock. The first GET operation will proceed.
200 // Make sure our first read operation consumes the data
201 // received from the first GET.
204 // doGet() should close toRead before sending any more bufs to it.
205 if what, ok := <-rdr.(*cfReader).toRead; ok {
206 c.Errorf("Got %q, expected toRead to be closed", what)
209 // Stub should have handled exactly one GET request.
210 c.Assert(*s.handler.ops, check.Equals, opsBeforeRead+1)
213 func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
214 manifest := ". ffffffffffffffffffffffffffffffff+1 0:1:notfound.txt\n"
215 buf := make([]byte, 1)
216 rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "notfound.txt")
217 c.Check(err, check.IsNil)
218 for i := 0; i < 2; i++ {
219 _, err = io.ReadFull(rdr, buf)
220 c.Check(err, check.NotNil)
221 c.Check(err, check.Not(check.Equals), io.EOF)
223 c.Check(rdr.Close(), check.NotNil)