Merge branch '5043-crunchstat-long-lines' closes #5043
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "crypto/md5"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9         "git.curoverse.com/arvados.git/sdk/go/streamer"
10         . "gopkg.in/check.v1"
11         "io"
12         "io/ioutil"
13         "log"
14         "net"
15         "net/http"
16         "os"
17         "testing"
18 )
19
20 // Gocheck boilerplate
21 func Test(t *testing.T) {
22         TestingT(t)
23 }
24
25 // Gocheck boilerplate
26 var _ = Suite(&ServerRequiredSuite{})
27 var _ = Suite(&StandaloneSuite{})
28
29 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
30
31 // Tests that require the Keep server running
32 type ServerRequiredSuite struct{}
33
34 // Standalone tests
35 type StandaloneSuite struct{}
36
37 func pythonDir() string {
38         cwd, _ := os.Getwd()
39         return fmt.Sprintf("%s/../../python/tests", cwd)
40 }
41
42 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
43         if *no_server {
44                 c.Skip("Skipping tests that require server")
45                 return
46         }
47         arvadostest.StartAPI()
48         arvadostest.StartKeep()
49 }
50
51 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
52         if *no_server {
53                 return
54         }
55         arvadostest.StopKeep()
56         arvadostest.StopAPI()
57 }
58
59 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
60         arv, err := arvadosclient.MakeArvadosClient()
61         c.Assert(err, Equals, nil)
62
63         kc, err := MakeKeepClient(&arv)
64
65         c.Assert(err, Equals, nil)
66         c.Check(len(kc.ServiceRoots()), Equals, 2)
67         for _, root := range kc.ServiceRoots() {
68                 c.Check(root, Matches, "http://localhost:\\d+")
69         }
70 }
71
72 type StubPutHandler struct {
73         c              *C
74         expectPath     string
75         expectApiToken string
76         expectBody     string
77         handled        chan string
78 }
79
80 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
81         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
82         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
83         body, err := ioutil.ReadAll(req.Body)
84         this.c.Check(err, Equals, nil)
85         this.c.Check(body, DeepEquals, []byte(this.expectBody))
86         resp.WriteHeader(200)
87         this.handled <- fmt.Sprintf("http://%s", req.Host)
88 }
89
90 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
91         var err error
92         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
93         if err != nil {
94                 panic(fmt.Sprintf("Could not listen on any port"))
95         }
96         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
97         go http.Serve(ks.listener, st)
98         return
99 }
100
101 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
102         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
103
104         ks := RunFakeKeepServer(st)
105         defer ks.listener.Close()
106
107         arv, _ := arvadosclient.MakeArvadosClient()
108         arv.ApiToken = "abc123"
109
110         kc, _ := MakeKeepClient(&arv)
111
112         reader, writer := io.Pipe()
113         upload_status := make(chan uploadStatus)
114
115         f(kc, ks.url, reader, writer, upload_status)
116 }
117
118 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
119         log.Printf("TestUploadToStubKeepServer")
120
121         st := StubPutHandler{
122                 c,
123                 "acbd18db4cc2f85cedef654fccc4a4d8",
124                 "abc123",
125                 "foo",
126                 make(chan string)}
127
128         UploadToStubHelper(c, st,
129                 func(kc KeepClient, url string, reader io.ReadCloser,
130                         writer io.WriteCloser, upload_status chan uploadStatus) {
131
132                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), "TestUploadToStubKeepServer")
133
134                         writer.Write([]byte("foo"))
135                         writer.Close()
136
137                         <-st.handled
138                         status := <-upload_status
139                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
140                 })
141
142         log.Printf("TestUploadToStubKeepServer done")
143 }
144
145 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
146         log.Printf("TestUploadToStubKeepServerBufferReader")
147
148         st := StubPutHandler{
149                 c,
150                 "acbd18db4cc2f85cedef654fccc4a4d8",
151                 "abc123",
152                 "foo",
153                 make(chan string)}
154
155         UploadToStubHelper(c, st,
156                 func(kc KeepClient, url string, reader io.ReadCloser,
157                         writer io.WriteCloser, upload_status chan uploadStatus) {
158
159                         tr := streamer.AsyncStreamFromReader(512, reader)
160                         defer tr.Close()
161
162                         br1 := tr.MakeStreamReader()
163
164                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, "TestUploadToStubKeepServerBufferReader")
165
166                         writer.Write([]byte("foo"))
167                         writer.Close()
168
169                         <-st.handled
170
171                         status := <-upload_status
172                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
173                 })
174
175         log.Printf("TestUploadToStubKeepServerBufferReader done")
176 }
177
178 type FailHandler struct {
179         handled chan string
180 }
181
182 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
183         resp.WriteHeader(500)
184         this.handled <- fmt.Sprintf("http://%s", req.Host)
185 }
186
187 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
188         log.Printf("TestFailedUploadToStubKeepServer")
189
190         st := FailHandler{
191                 make(chan string)}
192
193         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
194
195         UploadToStubHelper(c, st,
196                 func(kc KeepClient, url string, reader io.ReadCloser,
197                         writer io.WriteCloser, upload_status chan uploadStatus) {
198
199                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, "TestFailedUploadToStubKeepServer")
200
201                         writer.Write([]byte("foo"))
202                         writer.Close()
203
204                         <-st.handled
205
206                         status := <-upload_status
207                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
208                         c.Check(status.statusCode, Equals, 500)
209                 })
210         log.Printf("TestFailedUploadToStubKeepServer done")
211 }
212
213 type KeepServer struct {
214         listener net.Listener
215         url      string
216 }
217
218 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
219         ks = make([]KeepServer, n)
220
221         for i := 0; i < n; i += 1 {
222                 ks[i] = RunFakeKeepServer(st)
223         }
224
225         return ks
226 }
227
228 func (s *StandaloneSuite) TestPutB(c *C) {
229         log.Printf("TestPutB")
230
231         hash := Md5String("foo")
232
233         st := StubPutHandler{
234                 c,
235                 hash,
236                 "abc123",
237                 "foo",
238                 make(chan string, 5)}
239
240         arv, _ := arvadosclient.MakeArvadosClient()
241         kc, _ := MakeKeepClient(&arv)
242
243         kc.Want_replicas = 2
244         arv.ApiToken = "abc123"
245         service_roots := make(map[string]string)
246
247         ks := RunSomeFakeKeepServers(st, 5)
248
249         for i, k := range ks {
250                 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
251                 defer k.listener.Close()
252         }
253
254         kc.SetServiceRoots(service_roots)
255
256         kc.PutB([]byte("foo"))
257
258         shuff := NewRootSorter(
259                 kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
260
261         s1 := <-st.handled
262         s2 := <-st.handled
263         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
264                 (s1 == shuff[1] && s2 == shuff[0]),
265                 Equals,
266                 true)
267
268         log.Printf("TestPutB done")
269 }
270
271 func (s *StandaloneSuite) TestPutHR(c *C) {
272         log.Printf("TestPutHR")
273
274         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
275
276         st := StubPutHandler{
277                 c,
278                 hash,
279                 "abc123",
280                 "foo",
281                 make(chan string, 5)}
282
283         arv, _ := arvadosclient.MakeArvadosClient()
284         kc, _ := MakeKeepClient(&arv)
285
286         kc.Want_replicas = 2
287         arv.ApiToken = "abc123"
288         service_roots := make(map[string]string)
289
290         ks := RunSomeFakeKeepServers(st, 5)
291
292         for i, k := range ks {
293                 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
294                 defer k.listener.Close()
295         }
296
297         kc.SetServiceRoots(service_roots)
298
299         reader, writer := io.Pipe()
300
301         go func() {
302                 writer.Write([]byte("foo"))
303                 writer.Close()
304         }()
305
306         kc.PutHR(hash, reader, 3)
307
308         shuff := NewRootSorter(kc.ServiceRoots(), hash).GetSortedRoots()
309         log.Print(shuff)
310
311         s1 := <-st.handled
312         s2 := <-st.handled
313
314         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
315                 (s1 == shuff[1] && s2 == shuff[0]),
316                 Equals,
317                 true)
318
319         log.Printf("TestPutHR done")
320 }
321
322 func (s *StandaloneSuite) TestPutWithFail(c *C) {
323         log.Printf("TestPutWithFail")
324
325         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
326
327         st := StubPutHandler{
328                 c,
329                 hash,
330                 "abc123",
331                 "foo",
332                 make(chan string, 4)}
333
334         fh := FailHandler{
335                 make(chan string, 1)}
336
337         arv, err := arvadosclient.MakeArvadosClient()
338         kc, _ := MakeKeepClient(&arv)
339
340         kc.Want_replicas = 2
341         arv.ApiToken = "abc123"
342         service_roots := make(map[string]string)
343
344         ks1 := RunSomeFakeKeepServers(st, 4)
345         ks2 := RunSomeFakeKeepServers(fh, 1)
346
347         for i, k := range ks1 {
348                 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
349                 defer k.listener.Close()
350         }
351         for i, k := range ks2 {
352                 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
353                 defer k.listener.Close()
354         }
355
356         kc.SetServiceRoots(service_roots)
357
358         shuff := NewRootSorter(
359                 kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
360
361         phash, replicas, err := kc.PutB([]byte("foo"))
362
363         <-fh.handled
364
365         c.Check(err, Equals, nil)
366         c.Check(phash, Equals, "")
367         c.Check(replicas, Equals, 2)
368
369         s1 := <-st.handled
370         s2 := <-st.handled
371
372         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
373                 (s1 == shuff[2] && s2 == shuff[1]),
374                 Equals,
375                 true)
376 }
377
378 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
379         log.Printf("TestPutWithTooManyFail")
380
381         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
382
383         st := StubPutHandler{
384                 c,
385                 hash,
386                 "abc123",
387                 "foo",
388                 make(chan string, 1)}
389
390         fh := FailHandler{
391                 make(chan string, 4)}
392
393         arv, err := arvadosclient.MakeArvadosClient()
394         kc, _ := MakeKeepClient(&arv)
395
396         kc.Want_replicas = 2
397         arv.ApiToken = "abc123"
398         service_roots := make(map[string]string)
399
400         ks1 := RunSomeFakeKeepServers(st, 1)
401         ks2 := RunSomeFakeKeepServers(fh, 4)
402
403         for i, k := range ks1 {
404                 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
405                 defer k.listener.Close()
406         }
407         for i, k := range ks2 {
408                 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
409                 defer k.listener.Close()
410         }
411
412         kc.SetServiceRoots(service_roots)
413
414         _, replicas, err := kc.PutB([]byte("foo"))
415
416         c.Check(err, Equals, InsufficientReplicasError)
417         c.Check(replicas, Equals, 1)
418         c.Check(<-st.handled, Equals, ks1[0].url)
419
420         log.Printf("TestPutWithTooManyFail done")
421 }
422
423 type StubGetHandler struct {
424         c              *C
425         expectPath     string
426         expectApiToken string
427         returnBody     []byte
428 }
429
430 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
431         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
432         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
433         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
434         resp.Write(this.returnBody)
435 }
436
437 func (s *StandaloneSuite) TestGet(c *C) {
438         log.Printf("TestGet")
439
440         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
441
442         st := StubGetHandler{
443                 c,
444                 hash,
445                 "abc123",
446                 []byte("foo")}
447
448         ks := RunFakeKeepServer(st)
449         defer ks.listener.Close()
450
451         arv, err := arvadosclient.MakeArvadosClient()
452         kc, _ := MakeKeepClient(&arv)
453         arv.ApiToken = "abc123"
454         kc.SetServiceRoots(map[string]string{"x": ks.url})
455
456         r, n, url2, err := kc.Get(hash)
457         defer r.Close()
458         c.Check(err, Equals, nil)
459         c.Check(n, Equals, int64(3))
460         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
461
462         content, err2 := ioutil.ReadAll(r)
463         c.Check(err2, Equals, nil)
464         c.Check(content, DeepEquals, []byte("foo"))
465
466         log.Printf("TestGet done")
467 }
468
469 func (s *StandaloneSuite) TestGetFail(c *C) {
470         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
471
472         st := FailHandler{make(chan string, 1)}
473
474         ks := RunFakeKeepServer(st)
475         defer ks.listener.Close()
476
477         arv, err := arvadosclient.MakeArvadosClient()
478         kc, _ := MakeKeepClient(&arv)
479         arv.ApiToken = "abc123"
480         kc.SetServiceRoots(map[string]string{"x": ks.url})
481
482         r, n, url2, err := kc.Get(hash)
483         c.Check(err, Equals, BlockNotFound)
484         c.Check(n, Equals, int64(0))
485         c.Check(url2, Equals, "")
486         c.Check(r, Equals, nil)
487 }
488
489 type BarHandler struct {
490         handled chan string
491 }
492
493 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
494         resp.Write([]byte("bar"))
495         this.handled <- fmt.Sprintf("http://%s", req.Host)
496 }
497
498 func (s *StandaloneSuite) TestChecksum(c *C) {
499         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
500         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
501
502         st := BarHandler{make(chan string, 1)}
503
504         ks := RunFakeKeepServer(st)
505         defer ks.listener.Close()
506
507         arv, err := arvadosclient.MakeArvadosClient()
508         kc, _ := MakeKeepClient(&arv)
509         arv.ApiToken = "abc123"
510         kc.SetServiceRoots(map[string]string{"x": ks.url})
511
512         r, n, _, err := kc.Get(barhash)
513         _, err = ioutil.ReadAll(r)
514         c.Check(n, Equals, int64(3))
515         c.Check(err, Equals, nil)
516
517         <-st.handled
518
519         r, n, _, err = kc.Get(foohash)
520         _, err = ioutil.ReadAll(r)
521         c.Check(n, Equals, int64(3))
522         c.Check(err, Equals, BadChecksum)
523
524         <-st.handled
525 }
526
527 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
528         content := []byte("waz")
529         hash := fmt.Sprintf("%x", md5.Sum(content))
530
531         fh := FailHandler{
532                 make(chan string, 4)}
533
534         st := StubGetHandler{
535                 c,
536                 hash,
537                 "abc123",
538                 content}
539
540         arv, err := arvadosclient.MakeArvadosClient()
541         kc, _ := MakeKeepClient(&arv)
542         arv.ApiToken = "abc123"
543         service_roots := make(map[string]string)
544
545         ks1 := RunSomeFakeKeepServers(st, 1)
546         ks2 := RunSomeFakeKeepServers(fh, 4)
547
548         for i, k := range ks1 {
549                 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
550                 defer k.listener.Close()
551         }
552         for i, k := range ks2 {
553                 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
554                 defer k.listener.Close()
555         }
556
557         kc.SetServiceRoots(service_roots)
558
559         // This test works only if one of the failing services is
560         // attempted before the succeeding service. Otherwise,
561         // <-fh.handled below will just hang! (Probe order depends on
562         // the choice of block content "waz" and the UUIDs of the fake
563         // servers, so we just tried different strings until we found
564         // an example that passes this Assert.)
565         c.Assert(NewRootSorter(service_roots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
566
567         r, n, url2, err := kc.Get(hash)
568
569         <-fh.handled
570         c.Check(err, Equals, nil)
571         c.Check(n, Equals, int64(3))
572         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
573
574         read_content, err2 := ioutil.ReadAll(r)
575         c.Check(err2, Equals, nil)
576         c.Check(read_content, DeepEquals, content)
577 }
578
579 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
580         content := []byte("TestPutGetHead")
581
582         arv, err := arvadosclient.MakeArvadosClient()
583         kc, err := MakeKeepClient(&arv)
584         c.Assert(err, Equals, nil)
585
586         hash := fmt.Sprintf("%x", md5.Sum(content))
587
588         {
589                 n, _, err := kc.Ask(hash)
590                 c.Check(err, Equals, BlockNotFound)
591                 c.Check(n, Equals, int64(0))
592         }
593         {
594                 hash2, replicas, err := kc.PutB(content)
595                 c.Check(hash2, Equals, fmt.Sprintf("%s+%d", hash, len(content)))
596                 c.Check(replicas, Equals, 2)
597                 c.Check(err, Equals, nil)
598         }
599         {
600                 r, n, url2, err := kc.Get(hash)
601                 c.Check(err, Equals, nil)
602                 c.Check(n, Equals, int64(len(content)))
603                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
604
605                 read_content, err2 := ioutil.ReadAll(r)
606                 c.Check(err2, Equals, nil)
607                 c.Check(read_content, DeepEquals, content)
608         }
609         {
610                 n, url2, err := kc.Ask(hash)
611                 c.Check(err, Equals, nil)
612                 c.Check(n, Equals, int64(len(content)))
613                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
614         }
615 }
616
617 type StubProxyHandler struct {
618         handled chan string
619 }
620
621 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
622         resp.Header().Set("X-Keep-Replicas-Stored", "2")
623         this.handled <- fmt.Sprintf("http://%s", req.Host)
624 }
625
626 func (s *StandaloneSuite) TestPutProxy(c *C) {
627         log.Printf("TestPutProxy")
628
629         st := StubProxyHandler{make(chan string, 1)}
630
631         arv, err := arvadosclient.MakeArvadosClient()
632         kc, _ := MakeKeepClient(&arv)
633
634         kc.Want_replicas = 2
635         kc.Using_proxy = true
636         arv.ApiToken = "abc123"
637         service_roots := make(map[string]string)
638
639         ks1 := RunSomeFakeKeepServers(st, 1)
640
641         for i, k := range ks1 {
642                 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
643                 defer k.listener.Close()
644         }
645
646         kc.SetServiceRoots(service_roots)
647
648         _, replicas, err := kc.PutB([]byte("foo"))
649         <-st.handled
650
651         c.Check(err, Equals, nil)
652         c.Check(replicas, Equals, 2)
653
654         log.Printf("TestPutProxy done")
655 }
656
657 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
658         log.Printf("TestPutProxy")
659
660         st := StubProxyHandler{make(chan string, 1)}
661
662         arv, err := arvadosclient.MakeArvadosClient()
663         kc, _ := MakeKeepClient(&arv)
664
665         kc.Want_replicas = 3
666         kc.Using_proxy = true
667         arv.ApiToken = "abc123"
668         service_roots := make(map[string]string)
669
670         ks1 := RunSomeFakeKeepServers(st, 1)
671
672         for i, k := range ks1 {
673                 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
674                 defer k.listener.Close()
675         }
676         kc.SetServiceRoots(service_roots)
677
678         _, replicas, err := kc.PutB([]byte("foo"))
679         <-st.handled
680
681         c.Check(err, Equals, InsufficientReplicasError)
682         c.Check(replicas, Equals, 2)
683
684         log.Printf("TestPutProxy done")
685 }
686
687 func (s *StandaloneSuite) TestMakeLocator(c *C) {
688         l := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
689
690         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
691         c.Check(l.Size, Equals, 3)
692         c.Check(l.Signature, Equals, "abcde")
693         c.Check(l.Timestamp, Equals, "12345678")
694 }