2798: Continued refactoring buffer reader into separate buffer package. Made
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "arvados.org/buffer"
5         "crypto/md5"
6         "flag"
7         "fmt"
8         . "gopkg.in/check.v1"
9         "io"
10         "io/ioutil"
11         "log"
12         "net"
13         "net/http"
14         "os"
15         "os/exec"
16         "sort"
17         "strings"
18         "testing"
19 )
20
21 // Gocheck boilerplate
22 func Test(t *testing.T) {
23         TestingT(t)
24 }
25
26 // Gocheck boilerplate
27 var _ = Suite(&ServerRequiredSuite{})
28 var _ = Suite(&StandaloneSuite{})
29
30 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
31
32 // Tests that require the Keep server running
33 type ServerRequiredSuite struct{}
34
35 // Standalone tests
36 type StandaloneSuite struct{}
37
38 func pythonDir() string {
39         gopath := os.Getenv("GOPATH")
40         return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
41 }
42
43 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
44         if *no_server {
45                 c.Skip("Skipping tests that require server")
46         } else {
47                 os.Chdir(pythonDir())
48                 exec.Command("python", "run_test_server.py", "start").Run()
49                 exec.Command("python", "run_test_server.py", "start_keep").Run()
50         }
51 }
52
53 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
54         os.Chdir(pythonDir())
55         exec.Command("python", "run_test_server.py", "stop_keep").Run()
56         exec.Command("python", "run_test_server.py", "stop").Run()
57 }
58
59 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
60         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
61         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
62         os.Setenv("ARVADOS_API_HOST_INSECURE", "")
63
64         kc, err := MakeKeepClient()
65         c.Check(kc.ApiServer, Equals, "localhost:3001")
66         c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
67         c.Check(kc.ApiInsecure, Equals, false)
68
69         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
70
71         kc, err = MakeKeepClient()
72         c.Check(kc.ApiServer, Equals, "localhost:3001")
73         c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
74         c.Check(kc.ApiInsecure, Equals, true)
75         c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
76
77         c.Assert(err, Equals, nil)
78         c.Check(len(kc.Service_roots), Equals, 2)
79         c.Check(kc.Service_roots[0], Equals, "http://localhost:25107")
80         c.Check(kc.Service_roots[1], Equals, "http://localhost:25108")
81 }
82
83 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
84         kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
85
86         // "foo" acbd18db4cc2f85cedef654fccc4a4d8
87         foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
88         c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
89
90         // "bar" 37b51d194a7513e45b56f6524f2d51f2
91         bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
92         c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
93 }
94
95 type StubPutHandler struct {
96         c              *C
97         expectPath     string
98         expectApiToken string
99         expectBody     string
100         handled        chan string
101 }
102
103 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
104         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
105         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
106         body, err := ioutil.ReadAll(req.Body)
107         this.c.Check(err, Equals, nil)
108         this.c.Check(body, DeepEquals, []byte(this.expectBody))
109         resp.WriteHeader(200)
110         this.handled <- fmt.Sprintf("http://%s", req.Host)
111 }
112
113 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
114         server := http.Server{Handler: st}
115
116         var err error
117         listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
118         if err != nil {
119                 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
120         }
121
122         url = fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
123
124         go server.Serve(listener)
125         return listener, url
126 }
127
128 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
129         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
130
131         listener, url := RunBogusKeepServer(st, 2990)
132         defer listener.Close()
133
134         kc, _ := MakeKeepClient()
135         kc.ApiToken = "abc123"
136
137         reader, writer := io.Pipe()
138         upload_status := make(chan uploadStatus)
139
140         f(kc, url, reader, writer, upload_status)
141 }
142
143 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
144         st := StubPutHandler{
145                 c,
146                 "acbd18db4cc2f85cedef654fccc4a4d8",
147                 "abc123",
148                 "foo",
149                 make(chan string)}
150
151         UploadToStubHelper(c, st,
152                 func(kc KeepClient, url string, reader io.ReadCloser,
153                         writer io.WriteCloser, upload_status chan uploadStatus) {
154
155                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
156
157                         writer.Write([]byte("foo"))
158                         writer.Close()
159
160                         <-st.handled
161                         status := <-upload_status
162                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
163                 })
164 }
165
166 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
167         st := StubPutHandler{
168                 c,
169                 "acbd18db4cc2f85cedef654fccc4a4d8",
170                 "abc123",
171                 "foo",
172                 make(chan string)}
173
174         UploadToStubHelper(c, st,
175                 func(kc KeepClient, url string, reader io.ReadCloser,
176                         writer io.WriteCloser, upload_status chan uploadStatus) {
177
178                         tr := buffer.StartTransferFromReader(512, reader)
179                         defer tr.Close()
180
181                         br1 := tr.MakeBufferReader()
182
183                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
184
185                         writer.Write([]byte("foo"))
186                         writer.Close()
187
188                         <-tr.Reader_status
189                         <-st.handled
190
191                         status := <-upload_status
192                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
193                 })
194 }
195
196 type FailHandler struct {
197         handled chan string
198 }
199
200 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
201         resp.WriteHeader(500)
202         this.handled <- fmt.Sprintf("http://%s", req.Host)
203 }
204
205 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
206         st := FailHandler{
207                 make(chan string)}
208
209         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
210
211         UploadToStubHelper(c, st,
212                 func(kc KeepClient, url string, reader io.ReadCloser,
213                         writer io.WriteCloser, upload_status chan uploadStatus) {
214
215                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
216
217                         writer.Write([]byte("foo"))
218                         writer.Close()
219
220                         <-st.handled
221
222                         status := <-upload_status
223                         c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
224                         c.Check(status.StatusCode, Equals, 500)
225                 })
226
227 }
228
229 type KeepServer struct {
230         listener net.Listener
231         url      string
232 }
233
234 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
235         ks = make([]KeepServer, n)
236
237         for i := 0; i < n; i += 1 {
238                 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
239                 ks[i] = KeepServer{boguslistener, bogusurl}
240         }
241
242         return ks
243 }
244
245 func (s *StandaloneSuite) TestPutB(c *C) {
246         log.Printf("TestPutB")
247
248         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
249
250         st := StubPutHandler{
251                 c,
252                 hash,
253                 "abc123",
254                 "foo",
255                 make(chan string, 2)}
256
257         kc, _ := MakeKeepClient()
258
259         kc.Want_replicas = 2
260         kc.ApiToken = "abc123"
261         kc.Service_roots = make([]string, 5)
262
263         ks := RunSomeFakeKeepServers(st, 5, 2990)
264
265         for i := 0; i < len(ks); i += 1 {
266                 kc.Service_roots[i] = ks[i].url
267                 defer ks[i].listener.Close()
268         }
269
270         sort.Strings(kc.Service_roots)
271
272         kc.PutB([]byte("foo"))
273
274         shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
275
276         s1 := <-st.handled
277         s2 := <-st.handled
278         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
279                 (s1 == shuff[1] && s2 == shuff[0]),
280                 Equals,
281                 true)
282 }
283
284 func (s *StandaloneSuite) TestPutHR(c *C) {
285         log.Printf("TestPutHR")
286
287         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
288
289         st := StubPutHandler{
290                 c,
291                 hash,
292                 "abc123",
293                 "foo",
294                 make(chan string, 2)}
295
296         kc, _ := MakeKeepClient()
297
298         kc.Want_replicas = 2
299         kc.ApiToken = "abc123"
300         kc.Service_roots = make([]string, 5)
301
302         ks := RunSomeFakeKeepServers(st, 5, 2990)
303
304         for i := 0; i < len(ks); i += 1 {
305                 kc.Service_roots[i] = ks[i].url
306                 defer ks[i].listener.Close()
307         }
308
309         sort.Strings(kc.Service_roots)
310
311         reader, writer := io.Pipe()
312
313         go func() {
314                 writer.Write([]byte("foo"))
315                 writer.Close()
316         }()
317
318         kc.PutHR(hash, reader, 3)
319
320         shuff := kc.shuffledServiceRoots(hash)
321         log.Print(shuff)
322
323         s1 := <-st.handled
324         s2 := <-st.handled
325
326         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
327                 (s1 == shuff[1] && s2 == shuff[0]),
328                 Equals,
329                 true)
330 }
331
332 func (s *StandaloneSuite) TestPutWithFail(c *C) {
333         log.Printf("TestPutWithFail")
334
335         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
336
337         st := StubPutHandler{
338                 c,
339                 hash,
340                 "abc123",
341                 "foo",
342                 make(chan string, 2)}
343
344         fh := FailHandler{
345                 make(chan string, 1)}
346
347         kc, _ := MakeKeepClient()
348
349         kc.Want_replicas = 2
350         kc.ApiToken = "abc123"
351         kc.Service_roots = make([]string, 5)
352
353         ks1 := RunSomeFakeKeepServers(st, 4, 2990)
354         ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
355
356         for i, k := range ks1 {
357                 kc.Service_roots[i] = k.url
358                 defer k.listener.Close()
359         }
360         for i, k := range ks2 {
361                 kc.Service_roots[len(ks1)+i] = k.url
362                 defer k.listener.Close()
363         }
364
365         sort.Strings(kc.Service_roots)
366
367         shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
368
369         phash, replicas, err := kc.PutB([]byte("foo"))
370
371         <-fh.handled
372
373         c.Check(err, Equals, nil)
374         c.Check(phash, Equals, hash)
375         c.Check(replicas, Equals, 2)
376         c.Check(<-st.handled, Equals, shuff[1])
377         c.Check(<-st.handled, Equals, shuff[2])
378 }
379
380 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
381         log.Printf("TestPutWithTooManyFail")
382
383         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
384
385         st := StubPutHandler{
386                 c,
387                 hash,
388                 "abc123",
389                 "foo",
390                 make(chan string, 1)}
391
392         fh := FailHandler{
393                 make(chan string, 4)}
394
395         kc, _ := MakeKeepClient()
396
397         kc.Want_replicas = 2
398         kc.ApiToken = "abc123"
399         kc.Service_roots = make([]string, 5)
400
401         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
402         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
403
404         for i, k := range ks1 {
405                 kc.Service_roots[i] = k.url
406                 defer k.listener.Close()
407         }
408         for i, k := range ks2 {
409                 kc.Service_roots[len(ks1)+i] = k.url
410                 defer k.listener.Close()
411         }
412
413         sort.Strings(kc.Service_roots)
414
415         shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
416
417         _, replicas, err := kc.PutB([]byte("foo"))
418
419         c.Check(err, Equals, InsufficientReplicasError)
420         c.Check(replicas, Equals, 1)
421         c.Check(<-st.handled, Equals, shuff[1])
422 }
423
424 type StubGetHandler struct {
425         c              *C
426         expectPath     string
427         expectApiToken string
428         returnBody     []byte
429 }
430
431 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
432         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
433         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
434         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
435         resp.Write(this.returnBody)
436 }
437
438 func (s *StandaloneSuite) TestGet(c *C) {
439
440         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
441
442         st := StubGetHandler{
443                 c,
444                 hash,
445                 "abc123",
446                 []byte("foo")}
447
448         listener, url := RunBogusKeepServer(st, 2990)
449         defer listener.Close()
450
451         kc, _ := MakeKeepClient()
452         kc.ApiToken = "abc123"
453         kc.Service_roots = []string{url}
454
455         r, n, url2, err := kc.Get(hash)
456         c.Check(err, Equals, nil)
457         c.Check(n, Equals, int64(3))
458         c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
459
460         content, err2 := ioutil.ReadAll(r)
461         c.Check(err2, Equals, nil)
462         c.Check(content, DeepEquals, []byte("foo"))
463 }
464
465 func (s *StandaloneSuite) TestGetFail(c *C) {
466         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
467
468         st := FailHandler{make(chan string, 1)}
469
470         listener, url := RunBogusKeepServer(st, 2990)
471         defer listener.Close()
472
473         kc, _ := MakeKeepClient()
474         kc.ApiToken = "abc123"
475         kc.Service_roots = []string{url}
476
477         r, n, url2, err := kc.Get(hash)
478         c.Check(err, Equals, BlockNotFound)
479         c.Check(n, Equals, int64(0))
480         c.Check(url2, Equals, "")
481         c.Check(r, Equals, nil)
482 }
483
484 type BarHandler struct {
485         handled chan string
486 }
487
488 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
489         resp.Write([]byte("bar"))
490         this.handled <- fmt.Sprintf("http://%s", req.Host)
491 }
492
493 func (s *StandaloneSuite) TestChecksum(c *C) {
494         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
495         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
496
497         st := BarHandler{make(chan string, 1)}
498
499         listener, url := RunBogusKeepServer(st, 2990)
500         defer listener.Close()
501
502         kc, _ := MakeKeepClient()
503         kc.ApiToken = "abc123"
504         kc.Service_roots = []string{url}
505
506         r, n, _, err := kc.Get(barhash)
507         _, err = ioutil.ReadAll(r)
508         c.Check(n, Equals, int64(3))
509         c.Check(err, Equals, nil)
510
511         <-st.handled
512
513         r, n, _, err = kc.Get(foohash)
514         _, err = ioutil.ReadAll(r)
515         c.Check(n, Equals, int64(3))
516         c.Check(err, Equals, BadChecksum)
517
518         <-st.handled
519 }
520
521 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
522
523         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
524
525         fh := FailHandler{
526                 make(chan string, 1)}
527
528         st := StubGetHandler{
529                 c,
530                 hash,
531                 "abc123",
532                 []byte("foo")}
533
534         kc, _ := MakeKeepClient()
535         kc.ApiToken = "abc123"
536         kc.Service_roots = make([]string, 5)
537
538         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
539         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
540
541         for i, k := range ks1 {
542                 kc.Service_roots[i] = k.url
543                 defer k.listener.Close()
544         }
545         for i, k := range ks2 {
546                 kc.Service_roots[len(ks1)+i] = k.url
547                 defer k.listener.Close()
548         }
549
550         sort.Strings(kc.Service_roots)
551
552         r, n, url2, err := kc.Get(hash)
553         <-fh.handled
554         c.Check(err, Equals, nil)
555         c.Check(n, Equals, int64(3))
556         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
557
558         content, err2 := ioutil.ReadAll(r)
559         c.Check(err2, Equals, nil)
560         c.Check(content, DeepEquals, []byte("foo"))
561 }
562
563 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
564         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
565         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
566         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
567
568         kc, err := MakeKeepClient()
569         c.Assert(err, Equals, nil)
570
571         hash, replicas, err := kc.PutB([]byte("foo"))
572         c.Check(hash, Equals, fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
573         c.Check(replicas, Equals, 2)
574         c.Check(err, Equals, nil)
575
576         {
577                 r, n, url2, err := kc.Get(hash)
578                 c.Check(err, Equals, nil)
579                 c.Check(n, Equals, int64(3))
580                 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
581
582                 content, err2 := ioutil.ReadAll(r)
583                 c.Check(err2, Equals, nil)
584                 c.Check(content, DeepEquals, []byte("foo"))
585         }
586
587         {
588                 n, url2, err := kc.Ask(hash)
589                 c.Check(err, Equals, nil)
590                 c.Check(n, Equals, int64(3))
591                 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
592         }
593 }