4 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
5 "git.curoverse.com/arvados.git/sdk/go/streamer"
20 // Gocheck boilerplate
21 func Test(t *testing.T) {
25 // Gocheck boilerplate
26 var _ = Suite(&ServerRequiredSuite{})
27 var _ = Suite(&StandaloneSuite{})
29 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
31 // Tests that require the Keep server running
32 type ServerRequiredSuite struct{}
35 type StandaloneSuite struct{}
37 func pythonDir() string {
39 return fmt.Sprintf("%s/../../python/tests", cwd)
42 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
44 c.Skip("Skipping tests that require server")
49 cmd := exec.Command("python", "run_test_server.py", "start")
50 stderr, err := cmd.StderrPipe()
52 log.Fatalf("Setting up stderr pipe: %s", err)
54 go io.Copy(os.Stderr, stderr)
55 if err := cmd.Run(); err != nil {
56 panic(fmt.Sprintf("'python run_test_server.py start' returned error %s", err))
60 cmd := exec.Command("python", "run_test_server.py", "start_keep")
61 stderr, err := cmd.StderrPipe()
63 log.Fatalf("Setting up stderr pipe: %s", err)
65 go io.Copy(os.Stderr, stderr)
66 if err := cmd.Run(); err != nil {
67 panic(fmt.Sprintf("'python run_test_server.py start_keep' returned error %s", err))
72 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
74 exec.Command("python", "run_test_server.py", "stop_keep").Run()
75 exec.Command("python", "run_test_server.py", "stop").Run()
78 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
79 os.Setenv("ARVADOS_API_HOST", "localhost:3000")
80 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
81 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
83 arv, err := arvadosclient.MakeArvadosClient()
84 c.Assert(err, Equals, nil)
86 kc, err := MakeKeepClient(&arv)
88 c.Assert(err, Equals, nil)
89 c.Check(len(kc.ServiceRoots()), Equals, 2)
90 for _, root := range kc.ServiceRoots() {
91 c.Check(root, Matches, "http://localhost:2510[\\d]")
95 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
96 roots := map[string]string{
97 "zzzzz-bi6l4-2q7dq8becevdqfb": "http://localhost:1",
98 "zzzzz-bi6l4-4gbhck2w7lq0d96": "http://localhost:2",
99 "zzzzz-bi6l4-4bt69dsk0quh7ae": "http://localhost:3",
100 "zzzzz-bi6l4-62w1fgd0ud2krxl": "http://localhost:4",
103 kc.SetServiceRoots(roots)
105 // "foo" acbd18db4cc2f85cedef654fccc4a4d8
106 foo_shuffle := []string{"http://localhost:4", "http://localhost:1", "http://localhost:3", "http://localhost:2"}
107 c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
109 // "bar" 37b51d194a7513e45b56f6524f2d51f2
110 bar_shuffle := []string{"http://localhost:3", "http://localhost:2", "http://localhost:4", "http://localhost:1"}
111 c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
114 type StubPutHandler struct {
117 expectApiToken string
122 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
123 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
124 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
125 body, err := ioutil.ReadAll(req.Body)
126 this.c.Check(err, Equals, nil)
127 this.c.Check(body, DeepEquals, []byte(this.expectBody))
128 resp.WriteHeader(200)
129 this.handled <- fmt.Sprintf("http://%s", req.Host)
132 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
134 listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
136 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
139 url = fmt.Sprintf("http://localhost:%d", port)
141 go http.Serve(listener, st)
145 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
146 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
148 listener, url := RunBogusKeepServer(st, 2990)
149 defer listener.Close()
151 arv, _ := arvadosclient.MakeArvadosClient()
152 arv.ApiToken = "abc123"
154 kc, _ := MakeKeepClient(&arv)
156 reader, writer := io.Pipe()
157 upload_status := make(chan uploadStatus)
159 f(kc, url, reader, writer, upload_status)
162 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
163 log.Printf("TestUploadToStubKeepServer")
165 st := StubPutHandler{
167 "acbd18db4cc2f85cedef654fccc4a4d8",
172 UploadToStubHelper(c, st,
173 func(kc KeepClient, url string, reader io.ReadCloser,
174 writer io.WriteCloser, upload_status chan uploadStatus) {
176 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
178 writer.Write([]byte("foo"))
182 status := <-upload_status
183 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
186 log.Printf("TestUploadToStubKeepServer done")
189 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
190 log.Printf("TestUploadToStubKeepServerBufferReader")
192 st := StubPutHandler{
194 "acbd18db4cc2f85cedef654fccc4a4d8",
199 UploadToStubHelper(c, st,
200 func(kc KeepClient, url string, reader io.ReadCloser,
201 writer io.WriteCloser, upload_status chan uploadStatus) {
203 tr := streamer.AsyncStreamFromReader(512, reader)
206 br1 := tr.MakeStreamReader()
208 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
210 writer.Write([]byte("foo"))
215 status := <-upload_status
216 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
219 log.Printf("TestUploadToStubKeepServerBufferReader done")
222 type FailHandler struct {
226 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
227 resp.WriteHeader(500)
228 this.handled <- fmt.Sprintf("http://%s", req.Host)
231 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
232 log.Printf("TestFailedUploadToStubKeepServer")
237 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
239 UploadToStubHelper(c, st,
240 func(kc KeepClient, url string, reader io.ReadCloser,
241 writer io.WriteCloser, upload_status chan uploadStatus) {
243 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
245 writer.Write([]byte("foo"))
250 status := <-upload_status
251 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
252 c.Check(status.statusCode, Equals, 500)
254 log.Printf("TestFailedUploadToStubKeepServer done")
257 type KeepServer struct {
258 listener net.Listener
262 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
263 ks = make([]KeepServer, n)
265 for i := 0; i < n; i += 1 {
266 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
267 ks[i] = KeepServer{boguslistener, bogusurl}
273 func (s *StandaloneSuite) TestPutB(c *C) {
274 log.Printf("TestPutB")
276 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
278 st := StubPutHandler{
283 make(chan string, 5)}
285 arv, _ := arvadosclient.MakeArvadosClient()
286 kc, _ := MakeKeepClient(&arv)
289 arv.ApiToken = "abc123"
290 service_roots := make(map[string]string)
292 ks := RunSomeFakeKeepServers(st, 5, 2990)
294 for i := 0; i < len(ks); i += 1 {
295 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = ks[i].url
296 defer ks[i].listener.Close()
299 kc.SetServiceRoots(service_roots)
301 kc.PutB([]byte("foo"))
303 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
307 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
308 (s1 == shuff[1] && s2 == shuff[0]),
312 log.Printf("TestPutB done")
315 func (s *StandaloneSuite) TestPutHR(c *C) {
316 log.Printf("TestPutHR")
318 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
320 st := StubPutHandler{
325 make(chan string, 5)}
327 arv, _ := arvadosclient.MakeArvadosClient()
328 kc, _ := MakeKeepClient(&arv)
331 arv.ApiToken = "abc123"
332 service_roots := make(map[string]string)
334 ks := RunSomeFakeKeepServers(st, 5, 2990)
336 for i := 0; i < len(ks); i += 1 {
337 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = ks[i].url
338 defer ks[i].listener.Close()
341 kc.SetServiceRoots(service_roots)
343 reader, writer := io.Pipe()
346 writer.Write([]byte("foo"))
350 kc.PutHR(hash, reader, 3)
352 shuff := kc.shuffledServiceRoots(hash)
358 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
359 (s1 == shuff[1] && s2 == shuff[0]),
363 log.Printf("TestPutHR done")
366 func (s *StandaloneSuite) TestPutWithFail(c *C) {
367 log.Printf("TestPutWithFail")
369 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
371 st := StubPutHandler{
376 make(chan string, 4)}
379 make(chan string, 1)}
381 arv, err := arvadosclient.MakeArvadosClient()
382 kc, _ := MakeKeepClient(&arv)
385 arv.ApiToken = "abc123"
386 service_roots := make(map[string]string)
388 ks1 := RunSomeFakeKeepServers(st, 4, 2990)
389 ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
391 for i, k := range ks1 {
392 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
393 defer k.listener.Close()
395 for i, k := range ks2 {
396 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
397 defer k.listener.Close()
400 kc.SetServiceRoots(service_roots)
402 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
404 phash, replicas, err := kc.PutB([]byte("foo"))
408 c.Check(err, Equals, nil)
409 c.Check(phash, Equals, "")
410 c.Check(replicas, Equals, 2)
411 c.Check(<-st.handled, Equals, shuff[1])
412 c.Check(<-st.handled, Equals, shuff[2])
415 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
416 log.Printf("TestPutWithTooManyFail")
418 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
420 st := StubPutHandler{
425 make(chan string, 1)}
428 make(chan string, 4)}
430 arv, err := arvadosclient.MakeArvadosClient()
431 kc, _ := MakeKeepClient(&arv)
434 arv.ApiToken = "abc123"
435 service_roots := make(map[string]string)
437 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
438 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
440 for i, k := range ks1 {
441 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
442 defer k.listener.Close()
444 for i, k := range ks2 {
445 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
446 defer k.listener.Close()
449 kc.SetServiceRoots(service_roots)
451 _, replicas, err := kc.PutB([]byte("foo"))
453 c.Check(err, Equals, InsufficientReplicasError)
454 c.Check(replicas, Equals, 1)
455 c.Check(<-st.handled, Matches, ".*2990")
457 log.Printf("TestPutWithTooManyFail done")
460 type StubGetHandler struct {
463 expectApiToken string
467 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
468 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
469 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
470 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
471 resp.Write(this.returnBody)
474 func (s *StandaloneSuite) TestGet(c *C) {
475 log.Printf("TestGet")
477 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
479 st := StubGetHandler{
485 listener, url := RunBogusKeepServer(st, 2990)
486 defer listener.Close()
488 arv, err := arvadosclient.MakeArvadosClient()
489 kc, _ := MakeKeepClient(&arv)
490 arv.ApiToken = "abc123"
491 kc.SetServiceRoots(map[string]string{"x":url})
493 r, n, url2, err := kc.Get(hash)
495 c.Check(err, Equals, nil)
496 c.Check(n, Equals, int64(3))
497 c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
499 content, err2 := ioutil.ReadAll(r)
500 c.Check(err2, Equals, nil)
501 c.Check(content, DeepEquals, []byte("foo"))
503 log.Printf("TestGet done")
506 func (s *StandaloneSuite) TestGetFail(c *C) {
507 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
509 st := FailHandler{make(chan string, 1)}
511 listener, url := RunBogusKeepServer(st, 2990)
512 defer listener.Close()
514 arv, err := arvadosclient.MakeArvadosClient()
515 kc, _ := MakeKeepClient(&arv)
516 arv.ApiToken = "abc123"
517 kc.SetServiceRoots(map[string]string{"x":url})
519 r, n, url2, err := kc.Get(hash)
520 c.Check(err, Equals, BlockNotFound)
521 c.Check(n, Equals, int64(0))
522 c.Check(url2, Equals, "")
523 c.Check(r, Equals, nil)
526 type BarHandler struct {
530 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
531 resp.Write([]byte("bar"))
532 this.handled <- fmt.Sprintf("http://%s", req.Host)
535 func (s *StandaloneSuite) TestChecksum(c *C) {
536 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
537 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
539 st := BarHandler{make(chan string, 1)}
541 listener, url := RunBogusKeepServer(st, 2990)
542 defer listener.Close()
544 arv, err := arvadosclient.MakeArvadosClient()
545 kc, _ := MakeKeepClient(&arv)
546 arv.ApiToken = "abc123"
547 kc.SetServiceRoots(map[string]string{"x":url})
549 r, n, _, err := kc.Get(barhash)
550 _, err = ioutil.ReadAll(r)
551 c.Check(n, Equals, int64(3))
552 c.Check(err, Equals, nil)
556 r, n, _, err = kc.Get(foohash)
557 _, err = ioutil.ReadAll(r)
558 c.Check(n, Equals, int64(3))
559 c.Check(err, Equals, BadChecksum)
564 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
565 content := []byte("waz")
566 hash := fmt.Sprintf("%x", md5.Sum(content))
569 make(chan string, 4)}
571 st := StubGetHandler{
577 arv, err := arvadosclient.MakeArvadosClient()
578 kc, _ := MakeKeepClient(&arv)
579 arv.ApiToken = "abc123"
580 service_roots := make(map[string]string)
582 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
583 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
585 for i, k := range ks1 {
586 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
587 defer k.listener.Close()
589 for i, k := range ks2 {
590 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
591 defer k.listener.Close()
594 kc.SetServiceRoots(service_roots)
596 // This test works only if one of the failing services is
597 // attempted before the succeeding service. Otherwise,
598 // <-fh.handled below will just hang! (Probe order depends on
599 // the choice of block content "waz" and the UUIDs of the fake
600 // servers, so we just tried different strings until we found
601 // an example that passes this Assert.)
602 c.Assert(NewRootSorter(service_roots, hash).GetSortedRoots()[0], Matches, ".*299[1-4]")
604 r, n, url2, err := kc.Get(hash)
607 c.Check(err, Equals, nil)
608 c.Check(n, Equals, int64(3))
609 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
611 read_content, err2 := ioutil.ReadAll(r)
612 c.Check(err2, Equals, nil)
613 c.Check(read_content, DeepEquals, content)
616 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
617 os.Setenv("ARVADOS_API_HOST", "localhost:3000")
618 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
619 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
621 arv, err := arvadosclient.MakeArvadosClient()
622 kc, err := MakeKeepClient(&arv)
623 c.Assert(err, Equals, nil)
625 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
628 n, _, err := kc.Ask(hash)
629 c.Check(err, Equals, BlockNotFound)
630 c.Check(n, Equals, int64(0))
633 hash2, replicas, err := kc.PutB([]byte("foo"))
634 c.Check(hash2, Equals, fmt.Sprintf("%s+%v", hash, 3))
635 c.Check(replicas, Equals, 2)
636 c.Check(err, Equals, nil)
639 r, n, url2, err := kc.Get(hash)
640 c.Check(err, Equals, nil)
641 c.Check(n, Equals, int64(3))
642 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
644 content, err2 := ioutil.ReadAll(r)
645 c.Check(err2, Equals, nil)
646 c.Check(content, DeepEquals, []byte("foo"))
649 n, url2, err := kc.Ask(hash)
650 c.Check(err, Equals, nil)
651 c.Check(n, Equals, int64(3))
652 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
656 type StubProxyHandler struct {
660 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
661 resp.Header().Set("X-Keep-Replicas-Stored", "2")
662 this.handled <- fmt.Sprintf("http://%s", req.Host)
665 func (s *StandaloneSuite) TestPutProxy(c *C) {
666 log.Printf("TestPutProxy")
668 st := StubProxyHandler{make(chan string, 1)}
670 arv, err := arvadosclient.MakeArvadosClient()
671 kc, _ := MakeKeepClient(&arv)
674 kc.Using_proxy = true
675 arv.ApiToken = "abc123"
676 service_roots := make(map[string]string)
678 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
680 for i, k := range ks1 {
681 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
682 defer k.listener.Close()
685 kc.SetServiceRoots(service_roots)
687 _, replicas, err := kc.PutB([]byte("foo"))
690 c.Check(err, Equals, nil)
691 c.Check(replicas, Equals, 2)
693 log.Printf("TestPutProxy done")
696 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
697 log.Printf("TestPutProxy")
699 st := StubProxyHandler{make(chan string, 1)}
701 arv, err := arvadosclient.MakeArvadosClient()
702 kc, _ := MakeKeepClient(&arv)
705 kc.Using_proxy = true
706 arv.ApiToken = "abc123"
707 service_roots := make(map[string]string)
709 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
711 for i, k := range ks1 {
712 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
713 defer k.listener.Close()
715 kc.SetServiceRoots(service_roots)
717 _, replicas, err := kc.PutB([]byte("foo"))
720 c.Check(err, Equals, InsufficientReplicasError)
721 c.Check(replicas, Equals, 2)
723 log.Printf("TestPutProxy done")
726 func (s *StandaloneSuite) TestMakeLocator(c *C) {
727 l := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
729 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
730 c.Check(l.Size, Equals, 3)
731 c.Check(l.Signature, Equals, "abcde")
732 c.Check(l.Timestamp, Equals, "12345678")