1d3bbeee308761be39b189ecfac044f6d009ff97
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "arvados.org/streamer"
5         "crypto/md5"
6         "flag"
7         "fmt"
8         . "gopkg.in/check.v1"
9         "io"
10         "io/ioutil"
11         "log"
12         "net"
13         "net/http"
14         "os"
15         "os/exec"
16         "sort"
17         "strings"
18         "testing"
19 )
20
21 // Gocheck boilerplate
22 func Test(t *testing.T) {
23         TestingT(t)
24 }
25
26 // Gocheck boilerplate
27 var _ = Suite(&ServerRequiredSuite{})
28 var _ = Suite(&StandaloneSuite{})
29
30 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
31
32 // Tests that require the Keep server running
33 type ServerRequiredSuite struct{}
34
35 // Standalone tests
36 type StandaloneSuite struct{}
37
38 func pythonDir() string {
39         gopath := os.Getenv("GOPATH")
40         return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
41 }
42
43 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
44         if *no_server {
45                 c.Skip("Skipping tests that require server")
46         } else {
47                 os.Chdir(pythonDir())
48                 exec.Command("python", "run_test_server.py", "start").Run()
49                 exec.Command("python", "run_test_server.py", "start_keep").Run()
50         }
51 }
52
53 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
54         os.Chdir(pythonDir())
55         exec.Command("python", "run_test_server.py", "stop_keep").Run()
56         exec.Command("python", "run_test_server.py", "stop").Run()
57 }
58
59 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
60         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
61         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
62         os.Setenv("ARVADOS_API_HOST_INSECURE", "")
63
64         kc, err := MakeKeepClient()
65         c.Check(kc.ApiServer, Equals, "localhost:3001")
66         c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
67         c.Check(kc.ApiInsecure, Equals, false)
68
69         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
70
71         kc, err = MakeKeepClient()
72         c.Check(kc.ApiServer, Equals, "localhost:3001")
73         c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
74         c.Check(kc.ApiInsecure, Equals, true)
75         c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
76
77         c.Assert(err, Equals, nil)
78         c.Check(len(kc.Service_roots), Equals, 2)
79         c.Check(kc.Service_roots[0], Equals, "http://localhost:25107")
80         c.Check(kc.Service_roots[1], Equals, "http://localhost:25108")
81 }
82
83 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
84         kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
85
86         // "foo" acbd18db4cc2f85cedef654fccc4a4d8
87         foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
88         c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
89
90         // "bar" 37b51d194a7513e45b56f6524f2d51f2
91         bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
92         c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
93 }
94
95 type StubPutHandler struct {
96         c              *C
97         expectPath     string
98         expectApiToken string
99         expectBody     string
100         handled        chan string
101 }
102
103 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
104         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
105         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
106         body, err := ioutil.ReadAll(req.Body)
107         this.c.Check(err, Equals, nil)
108         this.c.Check(body, DeepEquals, []byte(this.expectBody))
109         resp.WriteHeader(200)
110         this.handled <- fmt.Sprintf("http://%s", req.Host)
111 }
112
113 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
114         server := http.Server{Handler: st}
115
116         var err error
117         listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
118         if err != nil {
119                 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
120         }
121
122         url = fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
123
124         go server.Serve(listener)
125         return listener, url
126 }
127
128 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
129         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
130
131         listener, url := RunBogusKeepServer(st, 2990)
132         defer listener.Close()
133
134         kc, _ := MakeKeepClient()
135         kc.ApiToken = "abc123"
136
137         reader, writer := io.Pipe()
138         upload_status := make(chan uploadStatus)
139
140         f(kc, url, reader, writer, upload_status)
141 }
142
143 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
144         log.Printf("TestUploadToStubKeepServer")
145
146         st := StubPutHandler{
147                 c,
148                 "acbd18db4cc2f85cedef654fccc4a4d8",
149                 "abc123",
150                 "foo",
151                 make(chan string)}
152
153         UploadToStubHelper(c, st,
154                 func(kc KeepClient, url string, reader io.ReadCloser,
155                         writer io.WriteCloser, upload_status chan uploadStatus) {
156
157                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
158
159                         writer.Write([]byte("foo"))
160                         writer.Close()
161
162                         <-st.handled
163                         status := <-upload_status
164                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
165                 })
166
167         log.Printf("TestUploadToStubKeepServer done")
168 }
169
170 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
171         log.Printf("TestUploadToStubKeepServerBufferReader")
172
173         st := StubPutHandler{
174                 c,
175                 "acbd18db4cc2f85cedef654fccc4a4d8",
176                 "abc123",
177                 "foo",
178                 make(chan string)}
179
180         UploadToStubHelper(c, st,
181                 func(kc KeepClient, url string, reader io.ReadCloser,
182                         writer io.WriteCloser, upload_status chan uploadStatus) {
183
184                         tr := streamer.AsyncStreamFromReader(512, reader)
185                         defer tr.Close()
186
187                         br1 := tr.MakeStreamReader()
188
189                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
190
191                         writer.Write([]byte("foo"))
192                         writer.Close()
193
194                         <-st.handled
195
196                         status := <-upload_status
197                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
198                 })
199
200         log.Printf("TestUploadToStubKeepServerBufferReader done")
201 }
202
203 type FailHandler struct {
204         handled chan string
205 }
206
207 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
208         resp.WriteHeader(500)
209         this.handled <- fmt.Sprintf("http://%s", req.Host)
210 }
211
212 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
213         log.Printf("TestFailedUploadToStubKeepServer")
214
215         st := FailHandler{
216                 make(chan string)}
217
218         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
219
220         UploadToStubHelper(c, st,
221                 func(kc KeepClient, url string, reader io.ReadCloser,
222                         writer io.WriteCloser, upload_status chan uploadStatus) {
223
224                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
225
226                         writer.Write([]byte("foo"))
227                         writer.Close()
228
229                         <-st.handled
230
231                         status := <-upload_status
232                         c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
233                         c.Check(status.StatusCode, Equals, 500)
234                 })
235         log.Printf("TestFailedUploadToStubKeepServer done")
236 }
237
238 type KeepServer struct {
239         listener net.Listener
240         url      string
241 }
242
243 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
244         ks = make([]KeepServer, n)
245
246         for i := 0; i < n; i += 1 {
247                 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
248                 ks[i] = KeepServer{boguslistener, bogusurl}
249         }
250
251         return ks
252 }
253
254 func (s *StandaloneSuite) TestPutB(c *C) {
255         log.Printf("TestPutB")
256
257         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
258
259         st := StubPutHandler{
260                 c,
261                 hash,
262                 "abc123",
263                 "foo",
264                 make(chan string, 2)}
265
266         kc, _ := MakeKeepClient()
267
268         kc.Want_replicas = 2
269         kc.ApiToken = "abc123"
270         kc.Service_roots = make([]string, 5)
271
272         ks := RunSomeFakeKeepServers(st, 5, 2990)
273
274         for i := 0; i < len(ks); i += 1 {
275                 kc.Service_roots[i] = ks[i].url
276                 defer ks[i].listener.Close()
277         }
278
279         sort.Strings(kc.Service_roots)
280
281         kc.PutB([]byte("foo"))
282
283         shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
284
285         s1 := <-st.handled
286         s2 := <-st.handled
287         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
288                 (s1 == shuff[1] && s2 == shuff[0]),
289                 Equals,
290                 true)
291
292         log.Printf("TestPutB done")
293 }
294
295 func (s *StandaloneSuite) TestPutHR(c *C) {
296         log.Printf("TestPutHR")
297
298         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
299
300         st := StubPutHandler{
301                 c,
302                 hash,
303                 "abc123",
304                 "foo",
305                 make(chan string, 2)}
306
307         kc, _ := MakeKeepClient()
308
309         kc.Want_replicas = 2
310         kc.ApiToken = "abc123"
311         kc.Service_roots = make([]string, 5)
312
313         ks := RunSomeFakeKeepServers(st, 5, 2990)
314
315         for i := 0; i < len(ks); i += 1 {
316                 kc.Service_roots[i] = ks[i].url
317                 defer ks[i].listener.Close()
318         }
319
320         sort.Strings(kc.Service_roots)
321
322         reader, writer := io.Pipe()
323
324         go func() {
325                 writer.Write([]byte("foo"))
326                 writer.Close()
327         }()
328
329         kc.PutHR(hash, reader, 3)
330
331         shuff := kc.shuffledServiceRoots(hash)
332         log.Print(shuff)
333
334         s1 := <-st.handled
335         s2 := <-st.handled
336
337         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
338                 (s1 == shuff[1] && s2 == shuff[0]),
339                 Equals,
340                 true)
341
342         log.Printf("TestPutHR done")
343 }
344
345 func (s *StandaloneSuite) TestPutWithFail(c *C) {
346         log.Printf("TestPutWithFail")
347
348         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
349
350         st := StubPutHandler{
351                 c,
352                 hash,
353                 "abc123",
354                 "foo",
355                 make(chan string, 2)}
356
357         fh := FailHandler{
358                 make(chan string, 1)}
359
360         kc, _ := MakeKeepClient()
361
362         kc.Want_replicas = 2
363         kc.ApiToken = "abc123"
364         kc.Service_roots = make([]string, 5)
365
366         ks1 := RunSomeFakeKeepServers(st, 4, 2990)
367         ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
368
369         for i, k := range ks1 {
370                 kc.Service_roots[i] = k.url
371                 defer k.listener.Close()
372         }
373         for i, k := range ks2 {
374                 kc.Service_roots[len(ks1)+i] = k.url
375                 defer k.listener.Close()
376         }
377
378         sort.Strings(kc.Service_roots)
379
380         shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
381
382         phash, replicas, err := kc.PutB([]byte("foo"))
383
384         <-fh.handled
385
386         c.Check(err, Equals, nil)
387         c.Check(phash, Equals, hash)
388         c.Check(replicas, Equals, 2)
389         c.Check(<-st.handled, Equals, shuff[1])
390         c.Check(<-st.handled, Equals, shuff[2])
391 }
392
393 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
394         log.Printf("TestPutWithTooManyFail")
395
396         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
397
398         st := StubPutHandler{
399                 c,
400                 hash,
401                 "abc123",
402                 "foo",
403                 make(chan string, 1)}
404
405         fh := FailHandler{
406                 make(chan string, 4)}
407
408         kc, _ := MakeKeepClient()
409
410         kc.Want_replicas = 2
411         kc.ApiToken = "abc123"
412         kc.Service_roots = make([]string, 5)
413
414         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
415         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
416
417         for i, k := range ks1 {
418                 kc.Service_roots[i] = k.url
419                 defer k.listener.Close()
420         }
421         for i, k := range ks2 {
422                 kc.Service_roots[len(ks1)+i] = k.url
423                 defer k.listener.Close()
424         }
425
426         sort.Strings(kc.Service_roots)
427
428         shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
429
430         _, replicas, err := kc.PutB([]byte("foo"))
431
432         c.Check(err, Equals, InsufficientReplicasError)
433         c.Check(replicas, Equals, 1)
434         c.Check(<-st.handled, Equals, shuff[1])
435
436         log.Printf("TestPutWithTooManyFail done")
437 }
438
439 type StubGetHandler struct {
440         c              *C
441         expectPath     string
442         expectApiToken string
443         returnBody     []byte
444 }
445
446 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
447         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
448         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
449         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
450         resp.Write(this.returnBody)
451 }
452
453 func (s *StandaloneSuite) TestGet(c *C) {
454         log.Printf("TestGet")
455
456         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
457
458         st := StubGetHandler{
459                 c,
460                 hash,
461                 "abc123",
462                 []byte("foo")}
463
464         listener, url := RunBogusKeepServer(st, 2990)
465         defer listener.Close()
466
467         kc, _ := MakeKeepClient()
468         kc.ApiToken = "abc123"
469         kc.Service_roots = []string{url}
470
471         r, n, url2, err := kc.Get(hash)
472         defer r.Close()
473         c.Check(err, Equals, nil)
474         c.Check(n, Equals, int64(3))
475         c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
476
477         content, err2 := ioutil.ReadAll(r)
478         c.Check(err2, Equals, nil)
479         c.Check(content, DeepEquals, []byte("foo"))
480
481         log.Printf("TestGet done")
482 }
483
484 func (s *StandaloneSuite) TestGetFail(c *C) {
485         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
486
487         st := FailHandler{make(chan string, 1)}
488
489         listener, url := RunBogusKeepServer(st, 2990)
490         defer listener.Close()
491
492         kc, _ := MakeKeepClient()
493         kc.ApiToken = "abc123"
494         kc.Service_roots = []string{url}
495
496         r, n, url2, err := kc.Get(hash)
497         c.Check(err, Equals, BlockNotFound)
498         c.Check(n, Equals, int64(0))
499         c.Check(url2, Equals, "")
500         c.Check(r, Equals, nil)
501 }
502
503 type BarHandler struct {
504         handled chan string
505 }
506
507 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
508         resp.Write([]byte("bar"))
509         this.handled <- fmt.Sprintf("http://%s", req.Host)
510 }
511
512 func (s *StandaloneSuite) TestChecksum(c *C) {
513         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
514         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
515
516         st := BarHandler{make(chan string, 1)}
517
518         listener, url := RunBogusKeepServer(st, 2990)
519         defer listener.Close()
520
521         kc, _ := MakeKeepClient()
522         kc.ApiToken = "abc123"
523         kc.Service_roots = []string{url}
524
525         r, n, _, err := kc.Get(barhash)
526         _, err = ioutil.ReadAll(r)
527         c.Check(n, Equals, int64(3))
528         c.Check(err, Equals, nil)
529
530         <-st.handled
531
532         r, n, _, err = kc.Get(foohash)
533         _, err = ioutil.ReadAll(r)
534         c.Check(n, Equals, int64(3))
535         c.Check(err, Equals, BadChecksum)
536
537         <-st.handled
538 }
539
540 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
541
542         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
543
544         fh := FailHandler{
545                 make(chan string, 1)}
546
547         st := StubGetHandler{
548                 c,
549                 hash,
550                 "abc123",
551                 []byte("foo")}
552
553         kc, _ := MakeKeepClient()
554         kc.ApiToken = "abc123"
555         kc.Service_roots = make([]string, 5)
556
557         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
558         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
559
560         for i, k := range ks1 {
561                 kc.Service_roots[i] = k.url
562                 defer k.listener.Close()
563         }
564         for i, k := range ks2 {
565                 kc.Service_roots[len(ks1)+i] = k.url
566                 defer k.listener.Close()
567         }
568
569         sort.Strings(kc.Service_roots)
570
571         r, n, url2, err := kc.Get(hash)
572         <-fh.handled
573         c.Check(err, Equals, nil)
574         c.Check(n, Equals, int64(3))
575         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
576
577         content, err2 := ioutil.ReadAll(r)
578         c.Check(err2, Equals, nil)
579         c.Check(content, DeepEquals, []byte("foo"))
580 }
581
582 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
583         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
584         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
585         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
586
587         kc, err := MakeKeepClient()
588         c.Assert(err, Equals, nil)
589
590         hash, replicas, err := kc.PutB([]byte("foo"))
591         c.Check(hash, Equals, fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
592         c.Check(replicas, Equals, 2)
593         c.Check(err, Equals, nil)
594
595         {
596                 r, n, url2, err := kc.Get(hash)
597                 c.Check(err, Equals, nil)
598                 c.Check(n, Equals, int64(3))
599                 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
600
601                 content, err2 := ioutil.ReadAll(r)
602                 c.Check(err2, Equals, nil)
603                 c.Check(content, DeepEquals, []byte("foo"))
604         }
605
606         {
607                 n, url2, err := kc.Ask(hash)
608                 c.Check(err, Equals, nil)
609                 c.Check(n, Equals, int64(3))
610                 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
611         }
612 }