5824: Fix up error checking and early-close behavior inCollectionFileReader.
[arvados.git] / sdk / go / keepclient / collectionreader_test.go
1 package keepclient
2
3 import (
4         "crypto/md5"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "net/http"
9         "os"
10
11         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
12         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
13         check "gopkg.in/check.v1"
14 )
15
16 var _ = check.Suite(&IntegrationSuite{})
17
18 // IntegrationSuite tests need an API server
19 type IntegrationSuite struct{}
20
21 type SuccessHandler struct {
22         disk map[string][]byte
23         lock chan struct{}      // channel with buffer==1: full when an operation is in progress.
24         ops  *int               // number of operations completed
25 }
26
27 func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
28         switch req.Method {
29         case "PUT":
30                 buf, err := ioutil.ReadAll(req.Body)
31                 if err != nil {
32                         resp.WriteHeader(500)
33                         return
34                 }
35                 pdh := fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf))
36                 h.lock <- struct{}{}
37                 h.disk[pdh] = buf
38                 if h.ops != nil {
39                         (*h.ops)++
40                 }
41                 <- h.lock
42                 resp.Write([]byte(pdh))
43         case "GET":
44                 pdh := req.URL.Path[1:]
45                 h.lock <- struct{}{}
46                 buf, ok := h.disk[pdh]
47                 if h.ops != nil {
48                         (*h.ops)++
49                 }
50                 <- h.lock
51                 if !ok {
52                         resp.WriteHeader(http.StatusNotFound)
53                 } else {
54                         resp.Write(buf)
55                 }
56         default:
57                 resp.WriteHeader(http.StatusMethodNotAllowed)
58         }
59 }
60
61 type rdrTest struct {
62         mt   string      // manifest text
63         f    string      // filename
64         want interface{} // error or string to expect
65 }
66
67 func StubWithFakeServers(kc *KeepClient, h http.Handler) {
68         localRoots := make(map[string]string)
69         for i, k := range RunSomeFakeKeepServers(h, 4) {
70                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
71         }
72         kc.SetServiceRoots(localRoots, localRoots, nil)
73 }
74
75 func (s *ServerRequiredSuite) TestCollectionReaderContent(c *check.C) {
76         arv, err := arvadosclient.MakeArvadosClient()
77         c.Assert(err, check.IsNil)
78         arv.ApiToken = arvadostest.ActiveToken
79
80         kc, err := MakeKeepClient(&arv)
81         c.Assert(err, check.IsNil)
82
83         {
84                 h := SuccessHandler{
85                         disk: make(map[string][]byte),
86                         lock: make(chan struct{}, 1),
87                 }
88                 StubWithFakeServers(kc, h)
89                 kc.PutB([]byte("foo"))
90                 kc.PutB([]byte("bar"))
91                 kc.PutB([]byte("Hello world\n"))
92                 kc.PutB([]byte(""))
93         }
94
95         mt := arvadostest.PathologicalManifest
96
97         for _, testCase := range []rdrTest{
98                 {mt: mt, f: "zzzz", want: os.ErrNotExist},
99                 {mt: mt, f: "frob", want: os.ErrNotExist},
100                 {mt: mt, f: "/segmented/frob", want: os.ErrNotExist},
101                 {mt: mt, f: "./segmented/frob", want: os.ErrNotExist},
102                 {mt: mt, f: "/f", want: os.ErrNotExist},
103                 {mt: mt, f: "./f", want: os.ErrNotExist},
104                 {mt: mt, f: "foo bar//baz", want: os.ErrNotExist},
105                 {mt: mt, f: "foo/zero", want: ""},
106                 {mt: mt, f: "zero@0", want: ""},
107                 {mt: mt, f: "zero@1", want: ""},
108                 {mt: mt, f: "zero@4", want: ""},
109                 {mt: mt, f: "zero@9", want: ""},
110                 {mt: mt, f: "f", want: "f"},
111                 {mt: mt, f: "ooba", want: "ooba"},
112                 {mt: mt, f: "overlapReverse/o", want: "o"},
113                 {mt: mt, f: "overlapReverse/oo", want: "oo"},
114                 {mt: mt, f: "overlapReverse/ofoo", want: "ofoo"},
115                 {mt: mt, f: "foo bar/baz", want: "foo"},
116                 {mt: mt, f: "segmented/frob", want: "frob"},
117                 {mt: mt, f: "segmented/oof", want: "oof"},
118         } {
119                 rdr, err := kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
120                 switch want := testCase.want.(type) {
121                 case error:
122                         c.Check(rdr, check.IsNil)
123                         c.Check(err, check.Equals, want)
124                 case string:
125                         buf := make([]byte, len(want))
126                         n, err := io.ReadFull(rdr, buf)
127                         c.Check(err, check.IsNil)
128                         for i := 0; i < 4; i++ {
129                                 c.Check(string(buf), check.Equals, want)
130                                 n, err = rdr.Read(buf)
131                                 c.Check(n, check.Equals, 0)
132                                 c.Check(err, check.Equals, io.EOF)
133                         }
134                         c.Check(rdr.Close(), check.Equals, nil)
135                 }
136         }
137 }
138
139 func (s *ServerRequiredSuite) TestCollectionReaderCloseEarly(c *check.C) {
140         arv, err := arvadosclient.MakeArvadosClient()
141         c.Assert(err, check.IsNil)
142         arv.ApiToken = arvadostest.ActiveToken
143
144         kc, err := MakeKeepClient(&arv)
145         c.Assert(err, check.IsNil)
146
147         h := SuccessHandler{
148                 disk: make(map[string][]byte),
149                 lock: make(chan struct{}, 1),
150                 ops: new(int),
151         }
152         StubWithFakeServers(kc, h)
153         kc.PutB([]byte("foo"))
154
155         mt := ". "
156         for i := 0; i < 1000; i++ {
157                 mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 "
158         }
159         mt += "0:3000:foo1000.txt\n"
160
161         // Grab the stub server's lock, ensuring our cfReader doesn't
162         // get anything back from its first call to kc.Get() before we
163         // have a chance to call Close().
164         h.lock <- struct{}{}
165         opsBeforeRead := *h.ops
166
167         rdr, err := kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt")
168         c.Assert(err, check.IsNil)
169         err = rdr.Close()
170         c.Assert(err, check.IsNil)
171         c.Assert(rdr.Error(), check.IsNil)
172
173         // Release the stub server's lock. The first GET operation will proceed.
174         <-h.lock
175
176         // doGet() should close toRead before sending any more bufs to it.
177         if what, ok := <-rdr.toRead;  ok {
178                 c.Errorf("Got %+v, expected toRead to be closed", what)
179         }
180
181         // Stub should have handled exactly one GET request.
182         c.Assert(*h.ops, check.Equals, opsBeforeRead+1)
183 }