20 // Gocheck boilerplate
21 func Test(t *testing.T) {
25 // Gocheck boilerplate
26 var _ = Suite(&ServerRequiredSuite{})
27 var _ = Suite(&StandaloneSuite{})
29 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
31 // Tests that require the Keep server running
32 type ServerRequiredSuite struct{}
35 type StandaloneSuite struct{}
37 func pythonDir() string {
38 gopath := os.Getenv("GOPATH")
39 return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
42 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
44 c.Skip("Skipping tests that require server")
47 if err := exec.Command("python", "run_test_server.py", "start").Run(); err != nil {
48 panic("'python run_test_server.py start' returned error")
50 if err := exec.Command("python", "run_test_server.py", "start_keep").Run(); err != nil {
51 panic("'python run_test_server.py start_keep' returned error")
56 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
58 exec.Command("python", "run_test_server.py", "stop_keep").Run()
59 exec.Command("python", "run_test_server.py", "stop").Run()
62 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
63 os.Setenv("ARVADOS_API_HOST", "localhost:3001")
64 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
65 os.Setenv("ARVADOS_API_HOST_INSECURE", "")
67 kc, err := MakeKeepClient()
68 c.Check(kc.ApiServer, Equals, "localhost:3001")
69 c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
70 c.Check(kc.ApiInsecure, Equals, false)
72 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
74 kc, err = MakeKeepClient()
75 c.Check(kc.ApiServer, Equals, "localhost:3001")
76 c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
77 c.Check(kc.ApiInsecure, Equals, true)
78 c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
80 c.Assert(err, Equals, nil)
81 c.Check(len(kc.ServiceRoots()), Equals, 2)
82 c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
83 c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
86 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
88 kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
90 // "foo" acbd18db4cc2f85cedef654fccc4a4d8
91 foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
92 c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
94 // "bar" 37b51d194a7513e45b56f6524f2d51f2
95 bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
96 c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
99 type StubPutHandler struct {
102 expectApiToken string
107 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
108 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
109 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
110 body, err := ioutil.ReadAll(req.Body)
111 this.c.Check(err, Equals, nil)
112 this.c.Check(body, DeepEquals, []byte(this.expectBody))
113 resp.WriteHeader(200)
114 this.handled <- fmt.Sprintf("http://%s", req.Host)
117 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
119 listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
121 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
124 url = fmt.Sprintf("http://localhost:%d", port)
126 go http.Serve(listener, st)
130 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
131 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
133 listener, url := RunBogusKeepServer(st, 2990)
134 defer listener.Close()
136 kc, _ := MakeKeepClient()
137 kc.ApiToken = "abc123"
139 reader, writer := io.Pipe()
140 upload_status := make(chan uploadStatus)
142 f(kc, url, reader, writer, upload_status)
145 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
146 log.Printf("TestUploadToStubKeepServer")
148 st := StubPutHandler{
150 "acbd18db4cc2f85cedef654fccc4a4d8",
155 UploadToStubHelper(c, st,
156 func(kc KeepClient, url string, reader io.ReadCloser,
157 writer io.WriteCloser, upload_status chan uploadStatus) {
159 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
161 writer.Write([]byte("foo"))
165 status := <-upload_status
166 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
169 log.Printf("TestUploadToStubKeepServer done")
172 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
173 log.Printf("TestUploadToStubKeepServerBufferReader")
175 st := StubPutHandler{
177 "acbd18db4cc2f85cedef654fccc4a4d8",
182 UploadToStubHelper(c, st,
183 func(kc KeepClient, url string, reader io.ReadCloser,
184 writer io.WriteCloser, upload_status chan uploadStatus) {
186 tr := streamer.AsyncStreamFromReader(512, reader)
189 br1 := tr.MakeStreamReader()
191 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
193 writer.Write([]byte("foo"))
198 status := <-upload_status
199 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
202 log.Printf("TestUploadToStubKeepServerBufferReader done")
205 type FailHandler struct {
209 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
210 resp.WriteHeader(500)
211 this.handled <- fmt.Sprintf("http://%s", req.Host)
214 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
215 log.Printf("TestFailedUploadToStubKeepServer")
220 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
222 UploadToStubHelper(c, st,
223 func(kc KeepClient, url string, reader io.ReadCloser,
224 writer io.WriteCloser, upload_status chan uploadStatus) {
226 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
228 writer.Write([]byte("foo"))
233 status := <-upload_status
234 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
235 c.Check(status.statusCode, Equals, 500)
237 log.Printf("TestFailedUploadToStubKeepServer done")
240 type KeepServer struct {
241 listener net.Listener
245 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
246 ks = make([]KeepServer, n)
248 for i := 0; i < n; i += 1 {
249 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
250 ks[i] = KeepServer{boguslistener, bogusurl}
256 func (s *StandaloneSuite) TestPutB(c *C) {
257 log.Printf("TestPutB")
259 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
261 st := StubPutHandler{
266 make(chan string, 2)}
268 kc, _ := MakeKeepClient()
271 kc.ApiToken = "abc123"
272 service_roots := make([]string, 5)
274 ks := RunSomeFakeKeepServers(st, 5, 2990)
276 for i := 0; i < len(ks); i += 1 {
277 service_roots[i] = ks[i].url
278 defer ks[i].listener.Close()
281 kc.SetServiceRoots(service_roots)
283 kc.PutB([]byte("foo"))
285 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
289 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
290 (s1 == shuff[1] && s2 == shuff[0]),
294 log.Printf("TestPutB done")
297 func (s *StandaloneSuite) TestPutHR(c *C) {
298 log.Printf("TestPutHR")
300 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
302 st := StubPutHandler{
307 make(chan string, 2)}
309 kc, _ := MakeKeepClient()
312 kc.ApiToken = "abc123"
313 service_roots := make([]string, 5)
315 ks := RunSomeFakeKeepServers(st, 5, 2990)
317 for i := 0; i < len(ks); i += 1 {
318 service_roots[i] = ks[i].url
319 defer ks[i].listener.Close()
322 kc.SetServiceRoots(service_roots)
324 reader, writer := io.Pipe()
327 writer.Write([]byte("foo"))
331 kc.PutHR(hash, reader, 3)
333 shuff := kc.shuffledServiceRoots(hash)
339 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
340 (s1 == shuff[1] && s2 == shuff[0]),
344 log.Printf("TestPutHR done")
347 func (s *StandaloneSuite) TestPutWithFail(c *C) {
348 log.Printf("TestPutWithFail")
350 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
352 st := StubPutHandler{
357 make(chan string, 2)}
360 make(chan string, 1)}
362 kc, _ := MakeKeepClient()
365 kc.ApiToken = "abc123"
366 service_roots := make([]string, 5)
368 ks1 := RunSomeFakeKeepServers(st, 4, 2990)
369 ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
371 for i, k := range ks1 {
372 service_roots[i] = k.url
373 defer k.listener.Close()
375 for i, k := range ks2 {
376 service_roots[len(ks1)+i] = k.url
377 defer k.listener.Close()
380 kc.SetServiceRoots(service_roots)
382 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
384 phash, replicas, err := kc.PutB([]byte("foo"))
388 c.Check(err, Equals, nil)
389 c.Check(phash, Equals, hash)
390 c.Check(replicas, Equals, 2)
391 c.Check(<-st.handled, Equals, shuff[1])
392 c.Check(<-st.handled, Equals, shuff[2])
395 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
396 log.Printf("TestPutWithTooManyFail")
398 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
400 st := StubPutHandler{
405 make(chan string, 1)}
408 make(chan string, 4)}
410 kc, _ := MakeKeepClient()
413 kc.ApiToken = "abc123"
414 service_roots := make([]string, 5)
416 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
417 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
419 for i, k := range ks1 {
420 service_roots[i] = k.url
421 defer k.listener.Close()
423 for i, k := range ks2 {
424 service_roots[len(ks1)+i] = k.url
425 defer k.listener.Close()
428 kc.SetServiceRoots(service_roots)
430 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
432 _, replicas, err := kc.PutB([]byte("foo"))
434 c.Check(err, Equals, InsufficientReplicasError)
435 c.Check(replicas, Equals, 1)
436 c.Check(<-st.handled, Equals, shuff[1])
438 log.Printf("TestPutWithTooManyFail done")
441 type StubGetHandler struct {
444 expectApiToken string
448 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
449 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
450 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
451 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
452 resp.Write(this.returnBody)
455 func (s *StandaloneSuite) TestGet(c *C) {
456 log.Printf("TestGet")
458 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
460 st := StubGetHandler{
466 listener, url := RunBogusKeepServer(st, 2990)
467 defer listener.Close()
469 kc, _ := MakeKeepClient()
470 kc.ApiToken = "abc123"
471 kc.SetServiceRoots([]string{url})
473 r, n, url2, err := kc.Get(hash)
475 c.Check(err, Equals, nil)
476 c.Check(n, Equals, int64(3))
477 c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
479 content, err2 := ioutil.ReadAll(r)
480 c.Check(err2, Equals, nil)
481 c.Check(content, DeepEquals, []byte("foo"))
483 log.Printf("TestGet done")
486 func (s *StandaloneSuite) TestGetFail(c *C) {
487 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
489 st := FailHandler{make(chan string, 1)}
491 listener, url := RunBogusKeepServer(st, 2990)
492 defer listener.Close()
494 kc, _ := MakeKeepClient()
495 kc.ApiToken = "abc123"
496 kc.SetServiceRoots([]string{url})
498 r, n, url2, err := kc.Get(hash)
499 c.Check(err, Equals, BlockNotFound)
500 c.Check(n, Equals, int64(0))
501 c.Check(url2, Equals, "")
502 c.Check(r, Equals, nil)
505 type BarHandler struct {
509 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
510 resp.Write([]byte("bar"))
511 this.handled <- fmt.Sprintf("http://%s", req.Host)
514 func (s *StandaloneSuite) TestChecksum(c *C) {
515 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
516 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
518 st := BarHandler{make(chan string, 1)}
520 listener, url := RunBogusKeepServer(st, 2990)
521 defer listener.Close()
523 kc, _ := MakeKeepClient()
524 kc.ApiToken = "abc123"
525 kc.SetServiceRoots([]string{url})
527 r, n, _, err := kc.Get(barhash)
528 _, err = ioutil.ReadAll(r)
529 c.Check(n, Equals, int64(3))
530 c.Check(err, Equals, nil)
534 r, n, _, err = kc.Get(foohash)
535 _, err = ioutil.ReadAll(r)
536 c.Check(n, Equals, int64(3))
537 c.Check(err, Equals, BadChecksum)
542 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
544 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
547 make(chan string, 1)}
549 st := StubGetHandler{
555 kc, _ := MakeKeepClient()
556 kc.ApiToken = "abc123"
557 service_roots := make([]string, 5)
559 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
560 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
562 for i, k := range ks1 {
563 service_roots[i] = k.url
564 defer k.listener.Close()
566 for i, k := range ks2 {
567 service_roots[len(ks1)+i] = k.url
568 defer k.listener.Close()
571 kc.SetServiceRoots(service_roots)
573 r, n, url2, err := kc.Get(hash)
575 c.Check(err, Equals, nil)
576 c.Check(n, Equals, int64(3))
577 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
579 content, err2 := ioutil.ReadAll(r)
580 c.Check(err2, Equals, nil)
581 c.Check(content, DeepEquals, []byte("foo"))
584 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
585 os.Setenv("ARVADOS_API_HOST", "localhost:3001")
586 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
587 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
589 kc, err := MakeKeepClient()
590 c.Assert(err, Equals, nil)
592 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
595 n, _, err := kc.Ask(hash)
596 c.Check(err, Equals, BlockNotFound)
597 c.Check(n, Equals, int64(0))
600 hash2, replicas, err := kc.PutB([]byte("foo"))
601 c.Check(hash2, Equals, hash)
602 c.Check(replicas, Equals, 2)
603 c.Check(err, Equals, nil)
606 r, n, url2, err := kc.Get(hash)
607 c.Check(err, Equals, nil)
608 c.Check(n, Equals, int64(3))
609 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
611 content, err2 := ioutil.ReadAll(r)
612 c.Check(err2, Equals, nil)
613 c.Check(content, DeepEquals, []byte("foo"))
616 n, url2, err := kc.Ask(hash)
617 c.Check(err, Equals, nil)
618 c.Check(n, Equals, int64(3))
619 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
623 type StubProxyHandler struct {
627 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
628 resp.Header().Set("X-Keep-Replicas-Stored", "2")
629 this.handled <- fmt.Sprintf("http://%s", req.Host)
632 func (s *StandaloneSuite) TestPutProxy(c *C) {
633 log.Printf("TestPutProxy")
635 st := StubProxyHandler{make(chan string, 1)}
637 kc, _ := MakeKeepClient()
640 kc.Using_proxy = true
641 kc.ApiToken = "abc123"
642 service_roots := make([]string, 1)
644 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
646 for i, k := range ks1 {
647 service_roots[i] = k.url
648 defer k.listener.Close()
651 kc.SetServiceRoots(service_roots)
653 _, replicas, err := kc.PutB([]byte("foo"))
656 c.Check(err, Equals, nil)
657 c.Check(replicas, Equals, 2)
659 log.Printf("TestPutProxy done")
662 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
663 log.Printf("TestPutProxy")
665 st := StubProxyHandler{make(chan string, 1)}
667 kc, _ := MakeKeepClient()
670 kc.Using_proxy = true
671 kc.ApiToken = "abc123"
672 service_roots := make([]string, 1)
674 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
676 for i, k := range ks1 {
677 service_roots[i] = k.url
678 defer k.listener.Close()
680 kc.SetServiceRoots(service_roots)
682 _, replicas, err := kc.PutB([]byte("foo"))
685 c.Check(err, Equals, InsufficientReplicasError)
686 c.Check(replicas, Equals, 2)
688 log.Printf("TestPutProxy done")