2798: Added uploadToKeepServer() test
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "flag"
5         "fmt"
6         . "gopkg.in/check.v1"
7         "io"
8         "io/ioutil"
9         "log"
10         "net"
11         "net/http"
12         "os"
13         "os/exec"
14         "testing"
15         "time"
16 )
17
18 // Gocheck boilerplate
19 func Test(t *testing.T) { TestingT(t) }
20
21 // Gocheck boilerplate
22 var _ = Suite(&ServerRequiredSuite{})
23 var _ = Suite(&StandaloneSuite{})
24
25 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
26
27 // Tests that require the Keep server running
28 type ServerRequiredSuite struct{}
29
30 // Standalone tests
31 type StandaloneSuite struct{}
32
33 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
34         if *no_server {
35                 c.Skip("Skipping tests that require server")
36         } else {
37                 os.Chdir(os.ExpandEnv("$GOPATH../python"))
38                 exec.Command("python", "run_test_server.py", "start").Run()
39                 exec.Command("python", "run_test_server.py", "start_keep").Run()
40         }
41 }
42
43 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
44         os.Chdir(os.ExpandEnv("$GOPATH../python"))
45         exec.Command("python", "run_test_server.py", "stop_keep").Run()
46         exec.Command("python", "run_test_server.py", "stop").Run()
47 }
48
49 func (s *ServerRequiredSuite) TestInit(c *C) {
50         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
51         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
52         os.Setenv("ARVADOS_API_HOST_INSECURE", "")
53
54         kc, err := MakeKeepClient()
55         c.Assert(kc.ApiServer, Equals, "localhost:3001")
56         c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
57         c.Assert(kc.ApiInsecure, Equals, false)
58
59         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
60
61         kc, err = MakeKeepClient()
62         c.Assert(kc.ApiServer, Equals, "localhost:3001")
63         c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
64         c.Assert(kc.ApiInsecure, Equals, true)
65
66         c.Assert(err, Equals, nil)
67         c.Assert(len(kc.Service_roots), Equals, 2)
68         c.Assert(kc.Service_roots[0], Equals, "http://localhost:25107")
69         c.Assert(kc.Service_roots[1], Equals, "http://localhost:25108")
70 }
71
72 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
73         kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
74
75         // "foo" acbd18db4cc2f85cedef654fccc4a4d8
76         foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
77         c.Check(kc.ShuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
78
79         // "bar" 37b51d194a7513e45b56f6524f2d51f2
80         bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
81         c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
82 }
83
84 func ReadIntoBufferHelper(c *C, bufsize int) {
85         buffer := make([]byte, bufsize)
86
87         reader, writer := io.Pipe()
88         slices := make(chan ReaderSlice)
89
90         go ReadIntoBuffer(buffer, reader, slices)
91
92         {
93                 out := make([]byte, 128)
94                 for i := 0; i < 128; i += 1 {
95                         out[i] = byte(i)
96                 }
97                 writer.Write(out)
98                 s1 := <-slices
99                 c.Check(len(s1.slice), Equals, 128)
100                 c.Check(s1.reader_error, Equals, nil)
101                 for i := 0; i < 128; i += 1 {
102                         c.Check(s1.slice[i], Equals, byte(i))
103                 }
104                 for i := 0; i < len(buffer); i += 1 {
105                         if i < 128 {
106                                 c.Check(buffer[i], Equals, byte(i))
107                         } else {
108                                 c.Check(buffer[i], Equals, byte(0))
109                         }
110                 }
111         }
112         {
113                 out := make([]byte, 96)
114                 for i := 0; i < 96; i += 1 {
115                         out[i] = byte(i / 2)
116                 }
117                 writer.Write(out)
118                 s1 := <-slices
119                 c.Check(len(s1.slice), Equals, 96)
120                 c.Check(s1.reader_error, Equals, nil)
121                 for i := 0; i < 96; i += 1 {
122                         c.Check(s1.slice[i], Equals, byte(i/2))
123                 }
124                 for i := 0; i < len(buffer); i += 1 {
125                         if i < 128 {
126                                 c.Check(buffer[i], Equals, byte(i))
127                         } else if i < (128 + 96) {
128                                 c.Check(buffer[i], Equals, byte((i-128)/2))
129                         } else {
130                                 c.Check(buffer[i], Equals, byte(0))
131                         }
132                 }
133         }
134         {
135                 writer.Close()
136                 s1 := <-slices
137                 c.Check(len(s1.slice), Equals, 0)
138                 c.Check(s1.reader_error, Equals, io.EOF)
139         }
140 }
141
142 func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
143         ReadIntoBufferHelper(c, 512)
144         ReadIntoBufferHelper(c, 225)
145         ReadIntoBufferHelper(c, 224)
146 }
147
148 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
149         buffer := make([]byte, 223)
150         reader, writer := io.Pipe()
151         slices := make(chan ReaderSlice)
152
153         go ReadIntoBuffer(buffer, reader, slices)
154
155         {
156                 out := make([]byte, 128)
157                 for i := 0; i < 128; i += 1 {
158                         out[i] = byte(i)
159                 }
160                 writer.Write(out)
161                 s1 := <-slices
162                 c.Check(len(s1.slice), Equals, 128)
163                 c.Check(s1.reader_error, Equals, nil)
164                 for i := 0; i < 128; i += 1 {
165                         c.Check(s1.slice[i], Equals, byte(i))
166                 }
167                 for i := 0; i < len(buffer); i += 1 {
168                         if i < 128 {
169                                 c.Check(buffer[i], Equals, byte(i))
170                         } else {
171                                 c.Check(buffer[i], Equals, byte(0))
172                         }
173                 }
174         }
175         {
176                 out := make([]byte, 96)
177                 for i := 0; i < 96; i += 1 {
178                         out[i] = byte(i / 2)
179                 }
180
181                 // Write will deadlock because it can't write all the data, so
182                 // spin it off to a goroutine
183                 go writer.Write(out)
184                 s1 := <-slices
185
186                 c.Check(len(s1.slice), Equals, 95)
187                 c.Check(s1.reader_error, Equals, nil)
188                 for i := 0; i < 95; i += 1 {
189                         c.Check(s1.slice[i], Equals, byte(i/2))
190                 }
191                 for i := 0; i < len(buffer); i += 1 {
192                         if i < 128 {
193                                 c.Check(buffer[i], Equals, byte(i))
194                         } else if i < (128 + 95) {
195                                 c.Check(buffer[i], Equals, byte((i-128)/2))
196                         } else {
197                                 c.Check(buffer[i], Equals, byte(0))
198                         }
199                 }
200         }
201         {
202                 writer.Close()
203                 s1 := <-slices
204                 c.Check(len(s1.slice), Equals, 0)
205                 c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
206         }
207
208 }
209
210 func (s *StandaloneSuite) TestTransfer(c *C) {
211         reader, writer := io.Pipe()
212
213         // Buffer for reads from 'r'
214         buffer := make([]byte, 512)
215
216         // Read requests on Transfer() buffer
217         requests := make(chan ReadRequest)
218         defer close(requests)
219
220         // Reporting reader error states
221         reader_status := make(chan error)
222
223         go Transfer(buffer, reader, requests, reader_status)
224
225         br1 := MakeBufferReader(requests)
226         out := make([]byte, 128)
227
228         {
229                 // Write some data, and read into a buffer shorter than
230                 // available data
231                 for i := 0; i < 128; i += 1 {
232                         out[i] = byte(i)
233                 }
234
235                 writer.Write(out[:100])
236
237                 in := make([]byte, 64)
238                 n, err := br1.Read(in)
239
240                 c.Check(n, Equals, 64)
241                 c.Check(err, Equals, nil)
242
243                 for i := 0; i < 64; i += 1 {
244                         c.Check(in[i], Equals, out[i])
245                 }
246         }
247
248         {
249                 // Write some more data, and read into buffer longer than
250                 // available data
251                 in := make([]byte, 64)
252                 n, err := br1.Read(in)
253                 c.Check(n, Equals, 36)
254                 c.Check(err, Equals, nil)
255
256                 for i := 0; i < 36; i += 1 {
257                         c.Check(in[i], Equals, out[64+i])
258                 }
259
260         }
261
262         {
263                 // Test read before write
264                 type Rd struct {
265                         n   int
266                         err error
267                 }
268                 rd := make(chan Rd)
269                 in := make([]byte, 64)
270
271                 go func() {
272                         n, err := br1.Read(in)
273                         rd <- Rd{n, err}
274                 }()
275
276                 time.Sleep(100 * time.Millisecond)
277                 writer.Write(out[100:])
278
279                 got := <-rd
280
281                 c.Check(got.n, Equals, 28)
282                 c.Check(got.err, Equals, nil)
283
284                 for i := 0; i < 28; i += 1 {
285                         c.Check(in[i], Equals, out[100+i])
286                 }
287         }
288
289         br2 := MakeBufferReader(requests)
290         {
291                 // Test 'catch up' reader
292                 in := make([]byte, 256)
293                 n, err := br2.Read(in)
294
295                 c.Check(n, Equals, 128)
296                 c.Check(err, Equals, nil)
297
298                 for i := 0; i < 128; i += 1 {
299                         c.Check(in[i], Equals, out[i])
300                 }
301         }
302
303         {
304                 // Test closing the reader
305                 writer.Close()
306                 status := <-reader_status
307                 c.Check(status, Equals, io.EOF)
308
309                 in := make([]byte, 256)
310                 n1, err1 := br1.Read(in)
311                 n2, err2 := br2.Read(in)
312                 c.Check(n1, Equals, 0)
313                 c.Check(err1, Equals, io.EOF)
314                 c.Check(n2, Equals, 0)
315                 c.Check(err2, Equals, io.EOF)
316         }
317
318         {
319                 // Test 'catch up' reader after closing
320                 br3 := MakeBufferReader(requests)
321                 in := make([]byte, 256)
322                 n, err := br3.Read(in)
323
324                 c.Check(n, Equals, 128)
325                 c.Check(err, Equals, nil)
326
327                 for i := 0; i < 128; i += 1 {
328                         c.Check(in[i], Equals, out[i])
329                 }
330
331                 n, err = br3.Read(in)
332
333                 c.Check(n, Equals, 0)
334                 c.Check(err, Equals, io.EOF)
335         }
336 }
337
338 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
339         reader, writer := io.Pipe()
340
341         // Buffer for reads from 'r'
342         buffer := make([]byte, 100)
343
344         // Read requests on Transfer() buffer
345         requests := make(chan ReadRequest)
346         defer close(requests)
347
348         // Reporting reader error states
349         reader_status := make(chan error)
350
351         go Transfer(buffer, reader, requests, reader_status)
352
353         out := make([]byte, 101)
354         go writer.Write(out)
355
356         status := <-reader_status
357         c.Check(status, Equals, io.ErrShortBuffer)
358 }
359
360 type StubHandler struct {
361         c              *C
362         expectPath     string
363         expectApiToken string
364         expectBody     string
365         handled        chan bool
366 }
367
368 func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
369         this.c.Check(req.URL.Path, Equals, this.expectPath)
370         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
371         body, err := ioutil.ReadAll(req.Body)
372         this.c.Check(err, Equals, nil)
373         this.c.Check(body, DeepEquals, []byte(this.expectBody))
374         resp.WriteHeader(200)
375         this.handled <- true
376 }
377
378 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
379         st := StubHandler{
380                 c,
381                 "/acbd18db4cc2f85cedef654fccc4a4d8",
382                 "abc123",
383                 "foo",
384                 make(chan bool)}
385         server := http.Server{Handler: st}
386
387         listener, _ := net.ListenTCP("tcp", &net.TCPAddr{Port: 2999})
388         defer listener.Close()
389
390         log.Printf("%s", listener.Addr().String())
391
392         go server.Serve(listener)
393         kc, _ := MakeKeepClient()
394         kc.ApiToken = "abc123"
395
396         reader, writer := io.Pipe()
397         upload_status := make(chan UploadError)
398
399         go kc.uploadToKeepServer("http://localhost:2999", "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status)
400
401         writer.Write([]byte("foo"))
402         writer.Close()
403
404         <-st.handled
405         status := <-upload_status
406         c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"})
407 }
408
409 type FailHandler struct {
410         handled chan bool
411 }
412
413 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
414         resp.WriteHeader(400)
415         this.handled <- true
416 }
417
418 /*func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
419         log.Printf("blup")
420
421         c.Check(true, Equals, false)
422
423         log.Printf("blug")
424
425         st := FailHandler{make(chan bool)}
426         server := http.Server{Handler: st}
427
428         listener, _ := net.ListenTCP("tcp", &net.TCPAddr{})
429         defer listener.Close()
430
431         go server.Serve(listener)
432         kc, _ := MakeKeepClient()
433         kc.ApiToken = "abc123"
434
435         reader, writer := io.Pipe()
436         upload_status := make(chan UploadError)
437
438         go kc.uploadToKeepServer(fmt.Sprintf("http://localhost:%s", listener.Addr().String()), "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status)
439
440         log.Printf("Writing 1")
441
442         writer.Write([]byte("foo"))
443
444         log.Printf("Writing 2")
445
446         writer.Close()
447
448         log.Printf("Writing 3")
449
450         <-st.handled
451
452         log.Printf("Handled?!")
453
454         status := <-upload_status
455         c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"})
456 }*/