21 // Gocheck boilerplate
22 func Test(t *testing.T) {
26 // Gocheck boilerplate
27 var _ = Suite(&ServerRequiredSuite{})
28 var _ = Suite(&StandaloneSuite{})
30 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
32 // Tests that require the Keep server running
33 type ServerRequiredSuite struct{}
36 type StandaloneSuite struct{}
38 func pythonDir() string {
39 gopath := os.Getenv("GOPATH")
40 return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
43 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
45 c.Skip("Skipping tests that require server")
48 exec.Command("python", "run_test_server.py", "start").Run()
49 exec.Command("python", "run_test_server.py", "start_keep").Run()
53 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
55 exec.Command("python", "run_test_server.py", "stop_keep").Run()
56 exec.Command("python", "run_test_server.py", "stop").Run()
59 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
60 os.Setenv("ARVADOS_API_HOST", "localhost:3001")
61 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
62 os.Setenv("ARVADOS_API_HOST_INSECURE", "")
64 kc, err := MakeKeepClient()
65 c.Check(kc.ApiServer, Equals, "localhost:3001")
66 c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
67 c.Check(kc.ApiInsecure, Equals, false)
69 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
71 kc, err = MakeKeepClient()
72 c.Check(kc.ApiServer, Equals, "localhost:3001")
73 c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
74 c.Check(kc.ApiInsecure, Equals, true)
75 c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
77 c.Assert(err, Equals, nil)
78 c.Check(len(kc.Service_roots), Equals, 2)
79 c.Check(kc.Service_roots[0], Equals, "http://localhost:25107")
80 c.Check(kc.Service_roots[1], Equals, "http://localhost:25108")
83 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
84 kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
86 // "foo" acbd18db4cc2f85cedef654fccc4a4d8
87 foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
88 c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
90 // "bar" 37b51d194a7513e45b56f6524f2d51f2
91 bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
92 c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
95 type StubPutHandler struct {
103 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
104 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
105 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
106 body, err := ioutil.ReadAll(req.Body)
107 this.c.Check(err, Equals, nil)
108 this.c.Check(body, DeepEquals, []byte(this.expectBody))
109 resp.WriteHeader(200)
110 this.handled <- fmt.Sprintf("http://%s", req.Host)
113 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
115 listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
117 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
120 url = fmt.Sprintf("http://localhost:%d", port)
122 go http.Serve(listener, st)
126 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
127 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
129 listener, url := RunBogusKeepServer(st, 2990)
130 defer listener.Close()
132 kc, _ := MakeKeepClient()
133 kc.ApiToken = "abc123"
135 reader, writer := io.Pipe()
136 upload_status := make(chan uploadStatus)
138 f(kc, url, reader, writer, upload_status)
141 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
142 log.Printf("TestUploadToStubKeepServer")
144 st := StubPutHandler{
146 "acbd18db4cc2f85cedef654fccc4a4d8",
151 UploadToStubHelper(c, st,
152 func(kc KeepClient, url string, reader io.ReadCloser,
153 writer io.WriteCloser, upload_status chan uploadStatus) {
155 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
157 writer.Write([]byte("foo"))
161 status := <-upload_status
162 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
165 log.Printf("TestUploadToStubKeepServer done")
168 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
169 log.Printf("TestUploadToStubKeepServerBufferReader")
171 st := StubPutHandler{
173 "acbd18db4cc2f85cedef654fccc4a4d8",
178 UploadToStubHelper(c, st,
179 func(kc KeepClient, url string, reader io.ReadCloser,
180 writer io.WriteCloser, upload_status chan uploadStatus) {
182 tr := streamer.AsyncStreamFromReader(512, reader)
185 br1 := tr.MakeStreamReader()
187 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
189 writer.Write([]byte("foo"))
194 status := <-upload_status
195 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
198 log.Printf("TestUploadToStubKeepServerBufferReader done")
201 type FailHandler struct {
205 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
206 resp.WriteHeader(500)
207 this.handled <- fmt.Sprintf("http://%s", req.Host)
210 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
211 log.Printf("TestFailedUploadToStubKeepServer")
216 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
218 UploadToStubHelper(c, st,
219 func(kc KeepClient, url string, reader io.ReadCloser,
220 writer io.WriteCloser, upload_status chan uploadStatus) {
222 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
224 writer.Write([]byte("foo"))
229 status := <-upload_status
230 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
231 c.Check(status.statusCode, Equals, 500)
233 log.Printf("TestFailedUploadToStubKeepServer done")
236 type KeepServer struct {
237 listener net.Listener
241 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
242 ks = make([]KeepServer, n)
244 for i := 0; i < n; i += 1 {
245 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
246 ks[i] = KeepServer{boguslistener, bogusurl}
252 func (s *StandaloneSuite) TestPutB(c *C) {
253 log.Printf("TestPutB")
255 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
257 st := StubPutHandler{
262 make(chan string, 2)}
264 kc, _ := MakeKeepClient()
267 kc.ApiToken = "abc123"
268 kc.Service_roots = make([]string, 5)
270 ks := RunSomeFakeKeepServers(st, 5, 2990)
272 for i := 0; i < len(ks); i += 1 {
273 kc.Service_roots[i] = ks[i].url
274 defer ks[i].listener.Close()
277 sort.Strings(kc.Service_roots)
279 kc.PutB([]byte("foo"))
281 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
285 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
286 (s1 == shuff[1] && s2 == shuff[0]),
290 log.Printf("TestPutB done")
293 func (s *StandaloneSuite) TestPutHR(c *C) {
294 log.Printf("TestPutHR")
296 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
298 st := StubPutHandler{
303 make(chan string, 2)}
305 kc, _ := MakeKeepClient()
308 kc.ApiToken = "abc123"
309 kc.Service_roots = make([]string, 5)
311 ks := RunSomeFakeKeepServers(st, 5, 2990)
313 for i := 0; i < len(ks); i += 1 {
314 kc.Service_roots[i] = ks[i].url
315 defer ks[i].listener.Close()
318 sort.Strings(kc.Service_roots)
320 reader, writer := io.Pipe()
323 writer.Write([]byte("foo"))
327 kc.PutHR(hash, reader, 3)
329 shuff := kc.shuffledServiceRoots(hash)
335 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
336 (s1 == shuff[1] && s2 == shuff[0]),
340 log.Printf("TestPutHR done")
343 func (s *StandaloneSuite) TestPutWithFail(c *C) {
344 log.Printf("TestPutWithFail")
346 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
348 st := StubPutHandler{
353 make(chan string, 2)}
356 make(chan string, 1)}
358 kc, _ := MakeKeepClient()
361 kc.ApiToken = "abc123"
362 kc.Service_roots = make([]string, 5)
364 ks1 := RunSomeFakeKeepServers(st, 4, 2990)
365 ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
367 for i, k := range ks1 {
368 kc.Service_roots[i] = k.url
369 defer k.listener.Close()
371 for i, k := range ks2 {
372 kc.Service_roots[len(ks1)+i] = k.url
373 defer k.listener.Close()
376 sort.Strings(kc.Service_roots)
378 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
380 phash, replicas, err := kc.PutB([]byte("foo"))
384 c.Check(err, Equals, nil)
385 c.Check(phash, Equals, hash)
386 c.Check(replicas, Equals, 2)
387 c.Check(<-st.handled, Equals, shuff[1])
388 c.Check(<-st.handled, Equals, shuff[2])
391 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
392 log.Printf("TestPutWithTooManyFail")
394 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
396 st := StubPutHandler{
401 make(chan string, 1)}
404 make(chan string, 4)}
406 kc, _ := MakeKeepClient()
409 kc.ApiToken = "abc123"
410 kc.Service_roots = make([]string, 5)
412 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
413 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
415 for i, k := range ks1 {
416 kc.Service_roots[i] = k.url
417 defer k.listener.Close()
419 for i, k := range ks2 {
420 kc.Service_roots[len(ks1)+i] = k.url
421 defer k.listener.Close()
424 sort.Strings(kc.Service_roots)
426 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
428 _, replicas, err := kc.PutB([]byte("foo"))
430 c.Check(err, Equals, InsufficientReplicasError)
431 c.Check(replicas, Equals, 1)
432 c.Check(<-st.handled, Equals, shuff[1])
434 log.Printf("TestPutWithTooManyFail done")
437 type StubGetHandler struct {
440 expectApiToken string
444 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
445 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
446 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
447 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
448 resp.Write(this.returnBody)
451 func (s *StandaloneSuite) TestGet(c *C) {
452 log.Printf("TestGet")
454 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
456 st := StubGetHandler{
462 listener, url := RunBogusKeepServer(st, 2990)
463 defer listener.Close()
465 kc, _ := MakeKeepClient()
466 kc.ApiToken = "abc123"
467 kc.Service_roots = []string{url}
469 r, n, url2, err := kc.Get(hash)
471 c.Check(err, Equals, nil)
472 c.Check(n, Equals, int64(3))
473 c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
475 content, err2 := ioutil.ReadAll(r)
476 c.Check(err2, Equals, nil)
477 c.Check(content, DeepEquals, []byte("foo"))
479 log.Printf("TestGet done")
482 func (s *StandaloneSuite) TestGetFail(c *C) {
483 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
485 st := FailHandler{make(chan string, 1)}
487 listener, url := RunBogusKeepServer(st, 2990)
488 defer listener.Close()
490 kc, _ := MakeKeepClient()
491 kc.ApiToken = "abc123"
492 kc.Service_roots = []string{url}
494 r, n, url2, err := kc.Get(hash)
495 c.Check(err, Equals, BlockNotFound)
496 c.Check(n, Equals, int64(0))
497 c.Check(url2, Equals, "")
498 c.Check(r, Equals, nil)
501 type BarHandler struct {
505 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
506 resp.Write([]byte("bar"))
507 this.handled <- fmt.Sprintf("http://%s", req.Host)
510 func (s *StandaloneSuite) TestChecksum(c *C) {
511 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
512 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
514 st := BarHandler{make(chan string, 1)}
516 listener, url := RunBogusKeepServer(st, 2990)
517 defer listener.Close()
519 kc, _ := MakeKeepClient()
520 kc.ApiToken = "abc123"
521 kc.Service_roots = []string{url}
523 r, n, _, err := kc.Get(barhash)
524 _, err = ioutil.ReadAll(r)
525 c.Check(n, Equals, int64(3))
526 c.Check(err, Equals, nil)
530 r, n, _, err = kc.Get(foohash)
531 _, err = ioutil.ReadAll(r)
532 c.Check(n, Equals, int64(3))
533 c.Check(err, Equals, BadChecksum)
538 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
540 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
543 make(chan string, 1)}
545 st := StubGetHandler{
551 kc, _ := MakeKeepClient()
552 kc.ApiToken = "abc123"
553 kc.Service_roots = make([]string, 5)
555 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
556 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
558 for i, k := range ks1 {
559 kc.Service_roots[i] = k.url
560 defer k.listener.Close()
562 for i, k := range ks2 {
563 kc.Service_roots[len(ks1)+i] = k.url
564 defer k.listener.Close()
567 sort.Strings(kc.Service_roots)
569 r, n, url2, err := kc.Get(hash)
571 c.Check(err, Equals, nil)
572 c.Check(n, Equals, int64(3))
573 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
575 content, err2 := ioutil.ReadAll(r)
576 c.Check(err2, Equals, nil)
577 c.Check(content, DeepEquals, []byte("foo"))
580 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
581 os.Setenv("ARVADOS_API_HOST", "localhost:3001")
582 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
583 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
585 kc, err := MakeKeepClient()
586 c.Assert(err, Equals, nil)
588 hash, replicas, err := kc.PutB([]byte("foo"))
589 c.Check(hash, Equals, fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
590 c.Check(replicas, Equals, 2)
591 c.Check(err, Equals, nil)
594 r, n, url2, err := kc.Get(hash)
595 c.Check(err, Equals, nil)
596 c.Check(n, Equals, int64(3))
597 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
599 content, err2 := ioutil.ReadAll(r)
600 c.Check(err2, Equals, nil)
601 c.Check(content, DeepEquals, []byte("foo"))
605 n, url2, err := kc.Ask(hash)
606 c.Check(err, Equals, nil)
607 c.Check(n, Equals, int64(3))
608 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
612 type StubProxyHandler struct {
616 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
617 resp.Header().Set("X-Keep-Replicas-Stored", "2")
618 this.handled <- fmt.Sprintf("http://%s", req.Host)
621 func (s *StandaloneSuite) TestPutProxy(c *C) {
622 log.Printf("TestPutProxy")
624 st := StubProxyHandler{make(chan string, 1)}
626 kc, _ := MakeKeepClient()
629 kc.Using_proxy = true
630 kc.ApiToken = "abc123"
631 kc.Service_roots = make([]string, 1)
633 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
635 for i, k := range ks1 {
636 kc.Service_roots[i] = k.url
637 defer k.listener.Close()
640 _, replicas, err := kc.PutB([]byte("foo"))
643 c.Check(err, Equals, nil)
644 c.Check(replicas, Equals, 2)
646 log.Printf("TestPutProxy done")
649 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
650 log.Printf("TestPutProxy")
652 st := StubProxyHandler{make(chan string, 1)}
654 kc, _ := MakeKeepClient()
657 kc.Using_proxy = true
658 kc.ApiToken = "abc123"
659 kc.Service_roots = make([]string, 1)
661 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
663 for i, k := range ks1 {
664 kc.Service_roots[i] = k.url
665 defer k.listener.Close()
668 _, replicas, err := kc.PutB([]byte("foo"))
671 c.Check(err, Equals, InsufficientReplicasError)
672 c.Check(replicas, Equals, 2)
674 log.Printf("TestPutProxy done")