4 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
5 "git.curoverse.com/arvados.git/sdk/go/streamer"
20 // Gocheck boilerplate
21 func Test(t *testing.T) {
25 // Gocheck boilerplate
26 var _ = Suite(&ServerRequiredSuite{})
27 var _ = Suite(&StandaloneSuite{})
29 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
31 // Tests that require the Keep server running
32 type ServerRequiredSuite struct{}
35 type StandaloneSuite struct{}
37 func pythonDir() string {
39 return fmt.Sprintf("%s/../../python/tests", cwd)
42 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
44 c.Skip("Skipping tests that require server")
49 cmd := exec.Command("python", "run_test_server.py", "start")
50 stderr, err := cmd.StderrPipe()
52 log.Fatalf("Setting up stderr pipe: %s", err)
54 go io.Copy(os.Stderr, stderr)
55 if err := cmd.Run(); err != nil {
56 panic(fmt.Sprintf("'python run_test_server.py start' returned error %s", err))
60 cmd := exec.Command("python", "run_test_server.py", "start_keep")
61 stderr, err := cmd.StderrPipe()
63 log.Fatalf("Setting up stderr pipe: %s", err)
65 go io.Copy(os.Stderr, stderr)
66 if err := cmd.Run(); err != nil {
67 panic(fmt.Sprintf("'python run_test_server.py start_keep' returned error %s", err))
72 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
74 exec.Command("python", "run_test_server.py", "stop_keep").Run()
75 exec.Command("python", "run_test_server.py", "stop").Run()
78 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
79 os.Setenv("ARVADOS_API_HOST", "localhost:3000")
80 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
81 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
83 arv, err := arvadosclient.MakeArvadosClient()
84 c.Assert(err, Equals, nil)
86 kc, err := MakeKeepClient(&arv)
88 c.Assert(err, Equals, nil)
89 c.Check(len(kc.ServiceRoots()), Equals, 2)
90 for _, root := range kc.ServiceRoots() {
91 c.Check(root, Matches, "http://localhost:2510[\\d]")
95 type StubPutHandler struct {
103 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
104 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
105 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
106 body, err := ioutil.ReadAll(req.Body)
107 this.c.Check(err, Equals, nil)
108 this.c.Check(body, DeepEquals, []byte(this.expectBody))
109 resp.WriteHeader(200)
110 this.handled <- fmt.Sprintf("http://%s", req.Host)
113 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
115 listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
117 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
120 url = fmt.Sprintf("http://localhost:%d", port)
122 go http.Serve(listener, st)
126 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
127 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
129 listener, url := RunBogusKeepServer(st, 2990)
130 defer listener.Close()
132 arv, _ := arvadosclient.MakeArvadosClient()
133 arv.ApiToken = "abc123"
135 kc, _ := MakeKeepClient(&arv)
137 reader, writer := io.Pipe()
138 upload_status := make(chan uploadStatus)
140 f(kc, url, reader, writer, upload_status)
143 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
144 log.Printf("TestUploadToStubKeepServer")
146 st := StubPutHandler{
148 "acbd18db4cc2f85cedef654fccc4a4d8",
153 UploadToStubHelper(c, st,
154 func(kc KeepClient, url string, reader io.ReadCloser,
155 writer io.WriteCloser, upload_status chan uploadStatus) {
157 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
159 writer.Write([]byte("foo"))
163 status := <-upload_status
164 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
167 log.Printf("TestUploadToStubKeepServer done")
170 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
171 log.Printf("TestUploadToStubKeepServerBufferReader")
173 st := StubPutHandler{
175 "acbd18db4cc2f85cedef654fccc4a4d8",
180 UploadToStubHelper(c, st,
181 func(kc KeepClient, url string, reader io.ReadCloser,
182 writer io.WriteCloser, upload_status chan uploadStatus) {
184 tr := streamer.AsyncStreamFromReader(512, reader)
187 br1 := tr.MakeStreamReader()
189 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
191 writer.Write([]byte("foo"))
196 status := <-upload_status
197 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
200 log.Printf("TestUploadToStubKeepServerBufferReader done")
203 type FailHandler struct {
207 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
208 resp.WriteHeader(500)
209 this.handled <- fmt.Sprintf("http://%s", req.Host)
212 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
213 log.Printf("TestFailedUploadToStubKeepServer")
218 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
220 UploadToStubHelper(c, st,
221 func(kc KeepClient, url string, reader io.ReadCloser,
222 writer io.WriteCloser, upload_status chan uploadStatus) {
224 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
226 writer.Write([]byte("foo"))
231 status := <-upload_status
232 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
233 c.Check(status.statusCode, Equals, 500)
235 log.Printf("TestFailedUploadToStubKeepServer done")
238 type KeepServer struct {
239 listener net.Listener
243 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
244 ks = make([]KeepServer, n)
246 for i := 0; i < n; i += 1 {
247 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
248 ks[i] = KeepServer{boguslistener, bogusurl}
254 func (s *StandaloneSuite) TestPutB(c *C) {
255 log.Printf("TestPutB")
257 hash := Md5String("foo")
259 st := StubPutHandler{
264 make(chan string, 5)}
266 arv, _ := arvadosclient.MakeArvadosClient()
267 kc, _ := MakeKeepClient(&arv)
270 arv.ApiToken = "abc123"
271 service_roots := make(map[string]string)
273 ks := RunSomeFakeKeepServers(st, 5, 2990)
275 for i := 0; i < len(ks); i += 1 {
276 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = ks[i].url
277 defer ks[i].listener.Close()
280 kc.SetServiceRoots(service_roots)
282 kc.PutB([]byte("foo"))
284 shuff := NewRootSorter(
285 kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
289 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
290 (s1 == shuff[1] && s2 == shuff[0]),
294 log.Printf("TestPutB done")
297 func (s *StandaloneSuite) TestPutHR(c *C) {
298 log.Printf("TestPutHR")
300 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
302 st := StubPutHandler{
307 make(chan string, 5)}
309 arv, _ := arvadosclient.MakeArvadosClient()
310 kc, _ := MakeKeepClient(&arv)
313 arv.ApiToken = "abc123"
314 service_roots := make(map[string]string)
316 ks := RunSomeFakeKeepServers(st, 5, 2990)
318 for i := 0; i < len(ks); i += 1 {
319 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = ks[i].url
320 defer ks[i].listener.Close()
323 kc.SetServiceRoots(service_roots)
325 reader, writer := io.Pipe()
328 writer.Write([]byte("foo"))
332 kc.PutHR(hash, reader, 3)
334 shuff := NewRootSorter(kc.ServiceRoots(), hash).GetSortedRoots()
340 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
341 (s1 == shuff[1] && s2 == shuff[0]),
345 log.Printf("TestPutHR done")
348 func (s *StandaloneSuite) TestPutWithFail(c *C) {
349 log.Printf("TestPutWithFail")
351 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
353 st := StubPutHandler{
358 make(chan string, 4)}
361 make(chan string, 1)}
363 arv, err := arvadosclient.MakeArvadosClient()
364 kc, _ := MakeKeepClient(&arv)
367 arv.ApiToken = "abc123"
368 service_roots := make(map[string]string)
370 ks1 := RunSomeFakeKeepServers(st, 4, 2990)
371 ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
373 for i, k := range ks1 {
374 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
375 defer k.listener.Close()
377 for i, k := range ks2 {
378 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
379 defer k.listener.Close()
382 kc.SetServiceRoots(service_roots)
384 shuff := NewRootSorter(
385 kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
387 phash, replicas, err := kc.PutB([]byte("foo"))
391 c.Check(err, Equals, nil)
392 c.Check(phash, Equals, "")
393 c.Check(replicas, Equals, 2)
398 c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
399 (s1 == shuff[2] && s2 == shuff[1]),
404 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
405 log.Printf("TestPutWithTooManyFail")
407 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
409 st := StubPutHandler{
414 make(chan string, 1)}
417 make(chan string, 4)}
419 arv, err := arvadosclient.MakeArvadosClient()
420 kc, _ := MakeKeepClient(&arv)
423 arv.ApiToken = "abc123"
424 service_roots := make(map[string]string)
426 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
427 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
429 for i, k := range ks1 {
430 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
431 defer k.listener.Close()
433 for i, k := range ks2 {
434 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
435 defer k.listener.Close()
438 kc.SetServiceRoots(service_roots)
440 _, replicas, err := kc.PutB([]byte("foo"))
442 c.Check(err, Equals, InsufficientReplicasError)
443 c.Check(replicas, Equals, 1)
444 c.Check(<-st.handled, Matches, ".*2990")
446 log.Printf("TestPutWithTooManyFail done")
449 type StubGetHandler struct {
452 expectApiToken string
456 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
457 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
458 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
459 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
460 resp.Write(this.returnBody)
463 func (s *StandaloneSuite) TestGet(c *C) {
464 log.Printf("TestGet")
466 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
468 st := StubGetHandler{
474 listener, url := RunBogusKeepServer(st, 2990)
475 defer listener.Close()
477 arv, err := arvadosclient.MakeArvadosClient()
478 kc, _ := MakeKeepClient(&arv)
479 arv.ApiToken = "abc123"
480 kc.SetServiceRoots(map[string]string{"x":url})
482 r, n, url2, err := kc.Get(hash)
484 c.Check(err, Equals, nil)
485 c.Check(n, Equals, int64(3))
486 c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
488 content, err2 := ioutil.ReadAll(r)
489 c.Check(err2, Equals, nil)
490 c.Check(content, DeepEquals, []byte("foo"))
492 log.Printf("TestGet done")
495 func (s *StandaloneSuite) TestGetFail(c *C) {
496 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
498 st := FailHandler{make(chan string, 1)}
500 listener, url := RunBogusKeepServer(st, 2990)
501 defer listener.Close()
503 arv, err := arvadosclient.MakeArvadosClient()
504 kc, _ := MakeKeepClient(&arv)
505 arv.ApiToken = "abc123"
506 kc.SetServiceRoots(map[string]string{"x":url})
508 r, n, url2, err := kc.Get(hash)
509 c.Check(err, Equals, BlockNotFound)
510 c.Check(n, Equals, int64(0))
511 c.Check(url2, Equals, "")
512 c.Check(r, Equals, nil)
515 type BarHandler struct {
519 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
520 resp.Write([]byte("bar"))
521 this.handled <- fmt.Sprintf("http://%s", req.Host)
524 func (s *StandaloneSuite) TestChecksum(c *C) {
525 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
526 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
528 st := BarHandler{make(chan string, 1)}
530 listener, url := RunBogusKeepServer(st, 2990)
531 defer listener.Close()
533 arv, err := arvadosclient.MakeArvadosClient()
534 kc, _ := MakeKeepClient(&arv)
535 arv.ApiToken = "abc123"
536 kc.SetServiceRoots(map[string]string{"x":url})
538 r, n, _, err := kc.Get(barhash)
539 _, err = ioutil.ReadAll(r)
540 c.Check(n, Equals, int64(3))
541 c.Check(err, Equals, nil)
545 r, n, _, err = kc.Get(foohash)
546 _, err = ioutil.ReadAll(r)
547 c.Check(n, Equals, int64(3))
548 c.Check(err, Equals, BadChecksum)
553 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
554 content := []byte("waz")
555 hash := fmt.Sprintf("%x", md5.Sum(content))
558 make(chan string, 4)}
560 st := StubGetHandler{
566 arv, err := arvadosclient.MakeArvadosClient()
567 kc, _ := MakeKeepClient(&arv)
568 arv.ApiToken = "abc123"
569 service_roots := make(map[string]string)
571 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
572 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
574 for i, k := range ks1 {
575 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
576 defer k.listener.Close()
578 for i, k := range ks2 {
579 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
580 defer k.listener.Close()
583 kc.SetServiceRoots(service_roots)
585 // This test works only if one of the failing services is
586 // attempted before the succeeding service. Otherwise,
587 // <-fh.handled below will just hang! (Probe order depends on
588 // the choice of block content "waz" and the UUIDs of the fake
589 // servers, so we just tried different strings until we found
590 // an example that passes this Assert.)
591 c.Assert(NewRootSorter(service_roots, hash).GetSortedRoots()[0], Matches, ".*299[1-4]")
593 r, n, url2, err := kc.Get(hash)
596 c.Check(err, Equals, nil)
597 c.Check(n, Equals, int64(3))
598 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
600 read_content, err2 := ioutil.ReadAll(r)
601 c.Check(err2, Equals, nil)
602 c.Check(read_content, DeepEquals, content)
605 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
606 os.Setenv("ARVADOS_API_HOST", "localhost:3000")
607 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
608 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
609 content := []byte("TestPutGetHead")
611 arv, err := arvadosclient.MakeArvadosClient()
612 kc, err := MakeKeepClient(&arv)
613 c.Assert(err, Equals, nil)
615 hash := fmt.Sprintf("%x", md5.Sum(content))
618 n, _, err := kc.Ask(hash)
619 c.Check(err, Equals, BlockNotFound)
620 c.Check(n, Equals, int64(0))
623 hash2, replicas, err := kc.PutB(content)
624 c.Check(hash2, Equals, fmt.Sprintf("%s+%d", hash, len(content)))
625 c.Check(replicas, Equals, 2)
626 c.Check(err, Equals, nil)
629 r, n, url2, err := kc.Get(hash)
630 c.Check(err, Equals, nil)
631 c.Check(n, Equals, int64(len(content)))
632 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
634 read_content, err2 := ioutil.ReadAll(r)
635 c.Check(err2, Equals, nil)
636 c.Check(read_content, DeepEquals, content)
639 n, url2, err := kc.Ask(hash)
640 c.Check(err, Equals, nil)
641 c.Check(n, Equals, int64(len(content)))
642 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
646 type StubProxyHandler struct {
650 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
651 resp.Header().Set("X-Keep-Replicas-Stored", "2")
652 this.handled <- fmt.Sprintf("http://%s", req.Host)
655 func (s *StandaloneSuite) TestPutProxy(c *C) {
656 log.Printf("TestPutProxy")
658 st := StubProxyHandler{make(chan string, 1)}
660 arv, err := arvadosclient.MakeArvadosClient()
661 kc, _ := MakeKeepClient(&arv)
664 kc.Using_proxy = true
665 arv.ApiToken = "abc123"
666 service_roots := make(map[string]string)
668 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
670 for i, k := range ks1 {
671 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
672 defer k.listener.Close()
675 kc.SetServiceRoots(service_roots)
677 _, replicas, err := kc.PutB([]byte("foo"))
680 c.Check(err, Equals, nil)
681 c.Check(replicas, Equals, 2)
683 log.Printf("TestPutProxy done")
686 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
687 log.Printf("TestPutProxy")
689 st := StubProxyHandler{make(chan string, 1)}
691 arv, err := arvadosclient.MakeArvadosClient()
692 kc, _ := MakeKeepClient(&arv)
695 kc.Using_proxy = true
696 arv.ApiToken = "abc123"
697 service_roots := make(map[string]string)
699 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
701 for i, k := range ks1 {
702 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
703 defer k.listener.Close()
705 kc.SetServiceRoots(service_roots)
707 _, replicas, err := kc.PutB([]byte("foo"))
710 c.Check(err, Equals, InsufficientReplicasError)
711 c.Check(replicas, Equals, 2)
713 log.Printf("TestPutProxy done")
716 func (s *StandaloneSuite) TestMakeLocator(c *C) {
717 l := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
719 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
720 c.Check(l.Size, Equals, 3)
721 c.Check(l.Signature, Equals, "abcde")
722 c.Check(l.Timestamp, Equals, "12345678")