2798: Read requests from Transfer() now return a slice. Added BufferReader
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "flag"
5         "fmt"
6         . "gopkg.in/check.v1"
7         "io"
8         "io/ioutil"
9         "log"
10         "net"
11         "net/http"
12         "os"
13         "os/exec"
14         "testing"
15         "time"
16 )
17
18 // Gocheck boilerplate
19 func Test(t *testing.T) { TestingT(t) }
20
21 // Gocheck boilerplate
22 var _ = Suite(&ServerRequiredSuite{})
23 var _ = Suite(&StandaloneSuite{})
24
25 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
26
27 // Tests that require the Keep server running
28 type ServerRequiredSuite struct{}
29
30 // Standalone tests
31 type StandaloneSuite struct{}
32
33 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
34         if *no_server {
35                 c.Skip("Skipping tests that require server")
36         } else {
37                 os.Chdir(os.ExpandEnv("$GOPATH../python"))
38                 exec.Command("python", "run_test_server.py", "start").Run()
39                 exec.Command("python", "run_test_server.py", "start_keep").Run()
40         }
41 }
42
43 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
44         os.Chdir(os.ExpandEnv("$GOPATH../python"))
45         exec.Command("python", "run_test_server.py", "stop_keep").Run()
46         exec.Command("python", "run_test_server.py", "stop").Run()
47 }
48
49 func (s *ServerRequiredSuite) TestInit(c *C) {
50         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
51         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
52         os.Setenv("ARVADOS_API_HOST_INSECURE", "")
53
54         kc, err := MakeKeepClient()
55         c.Assert(kc.ApiServer, Equals, "localhost:3001")
56         c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
57         c.Assert(kc.ApiInsecure, Equals, false)
58
59         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
60
61         kc, err = MakeKeepClient()
62         c.Assert(kc.ApiServer, Equals, "localhost:3001")
63         c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
64         c.Assert(kc.ApiInsecure, Equals, true)
65
66         c.Assert(err, Equals, nil)
67         c.Assert(len(kc.Service_roots), Equals, 2)
68         c.Assert(kc.Service_roots[0], Equals, "http://localhost:25107")
69         c.Assert(kc.Service_roots[1], Equals, "http://localhost:25108")
70 }
71
72 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
73         kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
74
75         // "foo" acbd18db4cc2f85cedef654fccc4a4d8
76         foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
77         c.Check(kc.ShuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
78
79         // "bar" 37b51d194a7513e45b56f6524f2d51f2
80         bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
81         c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
82 }
83
84 func ReadIntoBufferHelper(c *C, bufsize int) {
85         buffer := make([]byte, bufsize)
86
87         reader, writer := io.Pipe()
88         slices := make(chan ReaderSlice)
89
90         go ReadIntoBuffer(buffer, reader, slices)
91
92         {
93                 out := make([]byte, 128)
94                 for i := 0; i < 128; i += 1 {
95                         out[i] = byte(i)
96                 }
97                 writer.Write(out)
98                 s1 := <-slices
99                 c.Check(len(s1.slice), Equals, 128)
100                 c.Check(s1.reader_error, Equals, nil)
101                 for i := 0; i < 128; i += 1 {
102                         c.Check(s1.slice[i], Equals, byte(i))
103                 }
104                 for i := 0; i < len(buffer); i += 1 {
105                         if i < 128 {
106                                 c.Check(buffer[i], Equals, byte(i))
107                         } else {
108                                 c.Check(buffer[i], Equals, byte(0))
109                         }
110                 }
111         }
112         {
113                 out := make([]byte, 96)
114                 for i := 0; i < 96; i += 1 {
115                         out[i] = byte(i / 2)
116                 }
117                 writer.Write(out)
118                 s1 := <-slices
119                 c.Check(len(s1.slice), Equals, 96)
120                 c.Check(s1.reader_error, Equals, nil)
121                 for i := 0; i < 96; i += 1 {
122                         c.Check(s1.slice[i], Equals, byte(i/2))
123                 }
124                 for i := 0; i < len(buffer); i += 1 {
125                         if i < 128 {
126                                 c.Check(buffer[i], Equals, byte(i))
127                         } else if i < (128 + 96) {
128                                 c.Check(buffer[i], Equals, byte((i-128)/2))
129                         } else {
130                                 c.Check(buffer[i], Equals, byte(0))
131                         }
132                 }
133         }
134         {
135                 writer.Close()
136                 s1 := <-slices
137                 c.Check(len(s1.slice), Equals, 0)
138                 c.Check(s1.reader_error, Equals, io.EOF)
139         }
140 }
141
142 func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
143         ReadIntoBufferHelper(c, 512)
144         ReadIntoBufferHelper(c, 225)
145         ReadIntoBufferHelper(c, 224)
146 }
147
148 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
149         buffer := make([]byte, 223)
150         reader, writer := io.Pipe()
151         slices := make(chan ReaderSlice)
152
153         go ReadIntoBuffer(buffer, reader, slices)
154
155         {
156                 out := make([]byte, 128)
157                 for i := 0; i < 128; i += 1 {
158                         out[i] = byte(i)
159                 }
160                 writer.Write(out)
161                 s1 := <-slices
162                 c.Check(len(s1.slice), Equals, 128)
163                 c.Check(s1.reader_error, Equals, nil)
164                 for i := 0; i < 128; i += 1 {
165                         c.Check(s1.slice[i], Equals, byte(i))
166                 }
167                 for i := 0; i < len(buffer); i += 1 {
168                         if i < 128 {
169                                 c.Check(buffer[i], Equals, byte(i))
170                         } else {
171                                 c.Check(buffer[i], Equals, byte(0))
172                         }
173                 }
174         }
175         {
176                 out := make([]byte, 96)
177                 for i := 0; i < 96; i += 1 {
178                         out[i] = byte(i / 2)
179                 }
180
181                 // Write will deadlock because it can't write all the data, so
182                 // spin it off to a goroutine
183                 go writer.Write(out)
184                 s1 := <-slices
185
186                 c.Check(len(s1.slice), Equals, 95)
187                 c.Check(s1.reader_error, Equals, nil)
188                 for i := 0; i < 95; i += 1 {
189                         c.Check(s1.slice[i], Equals, byte(i/2))
190                 }
191                 for i := 0; i < len(buffer); i += 1 {
192                         if i < 128 {
193                                 c.Check(buffer[i], Equals, byte(i))
194                         } else if i < (128 + 95) {
195                                 c.Check(buffer[i], Equals, byte((i-128)/2))
196                         } else {
197                                 c.Check(buffer[i], Equals, byte(0))
198                         }
199                 }
200         }
201         {
202                 writer.Close()
203                 s1 := <-slices
204                 c.Check(len(s1.slice), Equals, 0)
205                 c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
206         }
207
208 }
209
210 func (s *StandaloneSuite) TestTransfer(c *C) {
211         reader, writer := io.Pipe()
212
213         // Buffer for reads from 'r'
214         buffer := make([]byte, 512)
215
216         // Read requests on Transfer() buffer
217         requests := make(chan ReadRequest)
218         defer close(requests)
219
220         // Reporting reader error states
221         reader_status := make(chan error)
222
223         go Transfer(buffer, reader, requests, reader_status)
224
225         br1 := MakeBufferReader(requests)
226         out := make([]byte, 128)
227
228         {
229                 // Write some data, and read into a buffer shorter than
230                 // available data
231                 for i := 0; i < 128; i += 1 {
232                         out[i] = byte(i)
233                 }
234
235                 writer.Write(out[:100])
236
237                 in := make([]byte, 64)
238                 n, err := br1.Read(in)
239
240                 c.Check(n, Equals, 64)
241                 c.Check(err, Equals, nil)
242
243                 for i := 0; i < 64; i += 1 {
244                         c.Check(in[i], Equals, out[i])
245                 }
246         }
247
248         {
249                 // Write some more data, and read into buffer longer than
250                 // available data
251                 in := make([]byte, 64)
252                 n, err := br1.Read(in)
253                 c.Check(n, Equals, 36)
254                 c.Check(err, Equals, nil)
255
256                 for i := 0; i < 36; i += 1 {
257                         c.Check(in[i], Equals, out[64+i])
258                 }
259
260         }
261
262         {
263                 // Test read before write
264                 type Rd struct {
265                         n   int
266                         err error
267                 }
268                 rd := make(chan Rd)
269                 in := make([]byte, 64)
270
271                 go func() {
272                         n, err := br1.Read(in)
273                         rd <- Rd{n, err}
274                 }()
275
276                 time.Sleep(100 * time.Millisecond)
277                 writer.Write(out[100:])
278
279                 got := <-rd
280
281                 c.Check(got.n, Equals, 28)
282                 c.Check(got.err, Equals, nil)
283
284                 for i := 0; i < 28; i += 1 {
285                         c.Check(in[i], Equals, out[100+i])
286                 }
287         }
288
289         br2 := MakeBufferReader(requests)
290         {
291                 // Test 'catch up' reader
292                 in := make([]byte, 256)
293                 n, err := br2.Read(in)
294
295                 c.Check(n, Equals, 128)
296                 c.Check(err, Equals, nil)
297
298                 for i := 0; i < 128; i += 1 {
299                         c.Check(in[i], Equals, out[i])
300                 }
301         }
302
303         {
304                 // Test closing the reader
305                 writer.Close()
306                 status := <-reader_status
307                 c.Check(status, Equals, io.EOF)
308
309                 in := make([]byte, 256)
310                 n1, err1 := br1.Read(in)
311                 n2, err2 := br2.Read(in)
312                 c.Check(n1, Equals, 0)
313                 c.Check(err1, Equals, io.EOF)
314                 c.Check(n2, Equals, 0)
315                 c.Check(err2, Equals, io.EOF)
316         }
317
318         {
319                 // Test 'catch up' reader after closing
320                 br3 := MakeBufferReader(requests)
321                 in := make([]byte, 256)
322                 n, err := br3.Read(in)
323
324                 c.Check(n, Equals, 128)
325                 c.Check(err, Equals, nil)
326
327                 for i := 0; i < 128; i += 1 {
328                         c.Check(in[i], Equals, out[i])
329                 }
330
331                 n, err = br3.Read(in)
332
333                 c.Check(n, Equals, 0)
334                 c.Check(err, Equals, io.EOF)
335         }
336 }
337
338 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
339         reader, writer := io.Pipe()
340
341         // Buffer for reads from 'r'
342         buffer := make([]byte, 100)
343
344         // Read requests on Transfer() buffer
345         requests := make(chan ReadRequest)
346         defer close(requests)
347
348         // Reporting reader error states
349         reader_status := make(chan error)
350
351         go Transfer(buffer, reader, requests, reader_status)
352
353         out := make([]byte, 101)
354         go writer.Write(out)
355
356         status := <-reader_status
357         c.Check(status, Equals, io.ErrShortBuffer)
358 }
359
360 type StubHandler struct {
361         c              *C
362         expectPath     string
363         expectApiToken string
364         expectBody     string
365         handled        chan bool
366 }
367
368 func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
369         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
370         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
371         body, err := ioutil.ReadAll(req.Body)
372         this.c.Check(err, Equals, nil)
373         this.c.Check(body, DeepEquals, []byte(this.expectBody))
374         resp.WriteHeader(200)
375         this.handled <- true
376 }
377
378 func UploadToStubHelper(c *C, f func(*KeepClient, string, StubHandler,
379         io.ReadCloser, io.WriteCloser, chan UploadError)) {
380
381         st := StubHandler{
382                 c,
383                 "acbd18db4cc2f85cedef654fccc4a4d8",
384                 "abc123",
385                 "foo",
386                 make(chan bool)}
387         server := http.Server{Handler: st}
388
389         listener, _ := net.ListenTCP("tcp", &net.TCPAddr{})
390         defer listener.Close()
391
392         url := fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
393
394         go server.Serve(listener)
395         kc, _ := MakeKeepClient()
396         kc.ApiToken = "abc123"
397
398         reader, writer := io.Pipe()
399         upload_status := make(chan UploadError)
400
401         f(kc, url, st, reader, writer, upload_status)
402 }
403
404 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
405         log.Printf("Started TestUploadToStubKeepServer")
406
407         UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler,
408                 reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) {
409
410                 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status)
411
412                 writer.Write([]byte("foo"))
413                 writer.Close()
414
415                 <-st.handled
416                 status := <-upload_status
417                 c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)})
418         })
419 }
420
421 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
422         log.Printf("Started TestUploadToStubKeepServerBufferReader")
423
424         UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler,
425                 reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) {
426
427                 // Buffer for reads from 'r'
428                 buffer := make([]byte, 512)
429
430                 // Read requests on Transfer() buffer
431                 requests := make(chan ReadRequest)
432                 defer close(requests)
433
434                 // Reporting reader error states
435                 reader_status := make(chan error)
436
437                 go Transfer(buffer, reader, requests, reader_status)
438
439                 br1 := MakeBufferReader(requests)
440
441                 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status)
442
443                 writer.Write([]byte("foo"))
444                 writer.Close()
445
446                 <-reader_status
447                 <-st.handled
448
449                 status := <-upload_status
450                 c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)})
451
452                 //c.Check(true, Equals, false)
453         })
454 }
455
456 type FailHandler struct {
457         handled chan bool
458 }
459
460 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
461         resp.WriteHeader(400)
462         this.handled <- true
463 }
464
465 /*func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
466         log.Printf("blup")
467
468         c.Check(true, Equals, false)
469
470         log.Printf("blug")
471
472         st := FailHandler{make(chan bool)}
473         server := http.Server{Handler: st}
474
475         listener, _ := net.ListenTCP("tcp", &net.TCPAddr{})
476         defer listener.Close()
477
478         go server.Serve(listener)
479         kc, _ := MakeKeepClient()
480         kc.ApiToken = "abc123"
481
482         reader, writer := io.Pipe()
483         upload_status := make(chan UploadError)
484
485         go kc.uploadToKeepServer(fmt.Sprintf("http://localhost:%s", listener.Addr().String()), "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status)
486
487         log.Printf("Writing 1")
488
489         writer.Write([]byte("foo"))
490
491         log.Printf("Writing 2")
492
493         writer.Close()
494
495         log.Printf("Writing 3")
496
497         <-st.handled
498
499         log.Printf("Handled?!")
500
501         status := <-upload_status
502         c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"})
503 }*/