2798: Checkpoint commit, tests for ReadIntoBuffer() and Transfer() pass.
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "flag"
5         //"fmt"
6         . "gopkg.in/check.v1"
7         "io"
8         //"log"
9         "os"
10         "os/exec"
11         "testing"
12         "time"
13 )
14
15 // Gocheck boilerplate
16 func Test(t *testing.T) { TestingT(t) }
17
18 // Gocheck boilerplate
19 var _ = Suite(&ServerRequiredSuite{})
20 var _ = Suite(&StandaloneSuite{})
21
22 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
23
24 // Tests that require the Keep server running
25 type ServerRequiredSuite struct{}
26
27 // Standalone tests
28 type StandaloneSuite struct{}
29
30 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
31         if *no_server {
32                 c.Skip("Skipping tests that require server")
33         } else {
34                 os.Chdir(os.ExpandEnv("$GOPATH../python"))
35                 exec.Command("python", "run_test_server.py", "start").Run()
36                 exec.Command("python", "run_test_server.py", "start_keep").Run()
37         }
38 }
39
40 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
41         os.Chdir(os.ExpandEnv("$GOPATH../python"))
42         exec.Command("python", "run_test_server.py", "stop_keep").Run()
43         exec.Command("python", "run_test_server.py", "stop").Run()
44 }
45
46 func (s *ServerRequiredSuite) TestInit(c *C) {
47         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
48         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
49         os.Setenv("ARVADOS_API_HOST_INSECURE", "")
50
51         kc, err := MakeKeepClient()
52         c.Assert(kc.ApiServer, Equals, "localhost:3001")
53         c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
54         c.Assert(kc.ApiInsecure, Equals, false)
55
56         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
57
58         kc, err = MakeKeepClient()
59         c.Assert(kc.ApiServer, Equals, "localhost:3001")
60         c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
61         c.Assert(kc.ApiInsecure, Equals, true)
62
63         c.Assert(err, Equals, nil)
64         c.Assert(len(kc.Service_roots), Equals, 2)
65         c.Assert(kc.Service_roots[0], Equals, "http://localhost:25107")
66         c.Assert(kc.Service_roots[1], Equals, "http://localhost:25108")
67 }
68
69 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
70         kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
71
72         // "foo" acbd18db4cc2f85cedef654fccc4a4d8
73         foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
74         c.Check(kc.ShuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
75
76         // "bar" 37b51d194a7513e45b56f6524f2d51f2
77         bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
78         c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
79 }
80
81 func ReadIntoBufferHelper(c *C, bufsize int) {
82         buffer := make([]byte, bufsize)
83
84         reader, writer := io.Pipe()
85         slices := make(chan ReaderSlice)
86
87         go ReadIntoBuffer(buffer, reader, slices)
88
89         {
90                 out := make([]byte, 128)
91                 for i := 0; i < 128; i += 1 {
92                         out[i] = byte(i)
93                 }
94                 writer.Write(out)
95                 s1 := <-slices
96                 c.Check(len(s1.slice), Equals, 128)
97                 c.Check(s1.reader_error, Equals, nil)
98                 for i := 0; i < 128; i += 1 {
99                         c.Check(s1.slice[i], Equals, byte(i))
100                 }
101                 for i := 0; i < len(buffer); i += 1 {
102                         if i < 128 {
103                                 c.Check(buffer[i], Equals, byte(i))
104                         } else {
105                                 c.Check(buffer[i], Equals, byte(0))
106                         }
107                 }
108         }
109         {
110                 out := make([]byte, 96)
111                 for i := 0; i < 96; i += 1 {
112                         out[i] = byte(i / 2)
113                 }
114                 writer.Write(out)
115                 s1 := <-slices
116                 c.Check(len(s1.slice), Equals, 96)
117                 c.Check(s1.reader_error, Equals, nil)
118                 for i := 0; i < 96; i += 1 {
119                         c.Check(s1.slice[i], Equals, byte(i/2))
120                 }
121                 for i := 0; i < len(buffer); i += 1 {
122                         if i < 128 {
123                                 c.Check(buffer[i], Equals, byte(i))
124                         } else if i < (128 + 96) {
125                                 c.Check(buffer[i], Equals, byte((i-128)/2))
126                         } else {
127                                 c.Check(buffer[i], Equals, byte(0))
128                         }
129                 }
130         }
131         {
132                 writer.Close()
133                 s1 := <-slices
134                 c.Check(len(s1.slice), Equals, 0)
135                 c.Check(s1.reader_error, Equals, io.EOF)
136         }
137 }
138
139 func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
140         ReadIntoBufferHelper(c, 512)
141         ReadIntoBufferHelper(c, 225)
142         ReadIntoBufferHelper(c, 224)
143 }
144
145 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
146         buffer := make([]byte, 223)
147         reader, writer := io.Pipe()
148         slices := make(chan ReaderSlice)
149
150         go ReadIntoBuffer(buffer, reader, slices)
151
152         {
153                 out := make([]byte, 128)
154                 for i := 0; i < 128; i += 1 {
155                         out[i] = byte(i)
156                 }
157                 writer.Write(out)
158                 s1 := <-slices
159                 c.Check(len(s1.slice), Equals, 128)
160                 c.Check(s1.reader_error, Equals, nil)
161                 for i := 0; i < 128; i += 1 {
162                         c.Check(s1.slice[i], Equals, byte(i))
163                 }
164                 for i := 0; i < len(buffer); i += 1 {
165                         if i < 128 {
166                                 c.Check(buffer[i], Equals, byte(i))
167                         } else {
168                                 c.Check(buffer[i], Equals, byte(0))
169                         }
170                 }
171         }
172         {
173                 out := make([]byte, 96)
174                 for i := 0; i < 96; i += 1 {
175                         out[i] = byte(i / 2)
176                 }
177
178                 // Write will deadlock because it can't write all the data, so
179                 // spin it off to a goroutine
180                 go writer.Write(out)
181                 s1 := <-slices
182
183                 c.Check(len(s1.slice), Equals, 95)
184                 c.Check(s1.reader_error, Equals, nil)
185                 for i := 0; i < 95; i += 1 {
186                         c.Check(s1.slice[i], Equals, byte(i/2))
187                 }
188                 for i := 0; i < len(buffer); i += 1 {
189                         if i < 128 {
190                                 c.Check(buffer[i], Equals, byte(i))
191                         } else if i < (128 + 95) {
192                                 c.Check(buffer[i], Equals, byte((i-128)/2))
193                         } else {
194                                 c.Check(buffer[i], Equals, byte(0))
195                         }
196                 }
197         }
198         {
199                 writer.Close()
200                 s1 := <-slices
201                 c.Check(len(s1.slice), Equals, 0)
202                 c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
203         }
204
205 }
206
207 func (s *StandaloneSuite) TestTransfer(c *C) {
208         reader, writer := io.Pipe()
209
210         // Buffer for reads from 'r'
211         buffer := make([]byte, 512)
212
213         // Read requests on Transfer() buffer
214         requests := make(chan ReadRequest)
215         defer close(requests)
216
217         // Reporting reader error states
218         reader_status := make(chan error)
219
220         go Transfer(buffer, reader, requests, reader_status)
221
222         br1 := MakeBufferReader(requests)
223         out := make([]byte, 128)
224
225         {
226                 // Write some data, and read into a buffer shorter than
227                 // available data
228                 for i := 0; i < 128; i += 1 {
229                         out[i] = byte(i)
230                 }
231
232                 writer.Write(out[:100])
233
234                 in := make([]byte, 64)
235                 n, err := br1.Read(in)
236
237                 c.Check(n, Equals, 64)
238                 c.Check(err, Equals, nil)
239
240                 for i := 0; i < 64; i += 1 {
241                         c.Check(in[i], Equals, out[i])
242                 }
243         }
244
245         {
246                 // Write some more data, and read into buffer longer than
247                 // available data
248                 in := make([]byte, 64)
249                 n, err := br1.Read(in)
250                 c.Check(n, Equals, 36)
251                 c.Check(err, Equals, nil)
252
253                 for i := 0; i < 36; i += 1 {
254                         c.Check(in[i], Equals, out[64+i])
255                 }
256
257         }
258
259         {
260                 // Test read before write
261                 type Rd struct {
262                         n   int
263                         err error
264                 }
265                 rd := make(chan Rd)
266                 in := make([]byte, 64)
267
268                 go func() {
269                         n, err := br1.Read(in)
270                         rd <- Rd{n, err}
271                 }()
272
273                 time.Sleep(100 * time.Millisecond)
274                 writer.Write(out[100:])
275
276                 got := <-rd
277
278                 c.Check(got.n, Equals, 28)
279                 c.Check(got.err, Equals, nil)
280
281                 for i := 0; i < 28; i += 1 {
282                         c.Check(in[i], Equals, out[100+i])
283                 }
284         }
285
286         br2 := MakeBufferReader(requests)
287         {
288                 // Test 'catch up' reader
289                 in := make([]byte, 256)
290                 n, err := br2.Read(in)
291
292                 c.Check(n, Equals, 128)
293                 c.Check(err, Equals, nil)
294
295                 for i := 0; i < 128; i += 1 {
296                         c.Check(in[i], Equals, out[i])
297                 }
298         }
299
300         {
301                 // Test closing the reader
302                 writer.Close()
303                 status := <-reader_status
304                 c.Check(status, Equals, io.EOF)
305
306                 in := make([]byte, 256)
307                 n1, err1 := br1.Read(in)
308                 n2, err2 := br2.Read(in)
309                 c.Check(n1, Equals, 0)
310                 c.Check(err1, Equals, io.EOF)
311                 c.Check(n2, Equals, 0)
312                 c.Check(err2, Equals, io.EOF)
313         }
314
315         {
316                 // Test 'catch up' reader after closing
317                 br3 := MakeBufferReader(requests)
318                 in := make([]byte, 256)
319                 n, err := br3.Read(in)
320
321                 c.Check(n, Equals, 128)
322                 c.Check(err, Equals, nil)
323
324                 for i := 0; i < 128; i += 1 {
325                         c.Check(in[i], Equals, out[i])
326                 }
327
328                 n, err = br3.Read(in)
329
330                 c.Check(n, Equals, 0)
331                 c.Check(err, Equals, io.EOF)
332         }
333 }
334
335 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
336         reader, writer := io.Pipe()
337
338         // Buffer for reads from 'r'
339         buffer := make([]byte, 100)
340
341         // Read requests on Transfer() buffer
342         requests := make(chan ReadRequest)
343         defer close(requests)
344
345         // Reporting reader error states
346         reader_status := make(chan error)
347
348         go Transfer(buffer, reader, requests, reader_status)
349
350         out := make([]byte, 101)
351         go writer.Write(out)
352
353         status := <-reader_status
354         c.Check(status, Equals, io.ErrShortBuffer)
355 }