21 // Gocheck boilerplate
22 func Test(t *testing.T) {
26 // Gocheck boilerplate
27 var _ = Suite(&ServerRequiredSuite{})
28 var _ = Suite(&StandaloneSuite{})
30 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
32 // Tests that require the Keep server running
33 type ServerRequiredSuite struct{}
36 type StandaloneSuite struct{}
38 func pythonDir() string {
39 gopath := os.Getenv("GOPATH")
40 return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
43 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
45 c.Skip("Skipping tests that require server")
48 exec.Command("python", "run_test_server.py", "start").Run()
49 exec.Command("python", "run_test_server.py", "start_keep").Run()
53 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
55 exec.Command("python", "run_test_server.py", "stop_keep").Run()
56 exec.Command("python", "run_test_server.py", "stop").Run()
59 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
60 os.Setenv("ARVADOS_API_HOST", "localhost:3001")
61 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
62 os.Setenv("ARVADOS_API_HOST_INSECURE", "")
64 kc, err := MakeKeepClient()
65 c.Check(kc.ApiServer, Equals, "localhost:3001")
66 c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
67 c.Check(kc.ApiInsecure, Equals, false)
69 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
71 kc, err = MakeKeepClient()
72 c.Check(kc.ApiServer, Equals, "localhost:3001")
73 c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
74 c.Check(kc.ApiInsecure, Equals, true)
75 c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
77 c.Assert(err, Equals, nil)
78 c.Check(len(kc.Service_roots), Equals, 2)
79 c.Check(kc.Service_roots[0], Equals, "http://localhost:25107")
80 c.Check(kc.Service_roots[1], Equals, "http://localhost:25108")
83 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
84 kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
86 // "foo" acbd18db4cc2f85cedef654fccc4a4d8
87 foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
88 c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
90 // "bar" 37b51d194a7513e45b56f6524f2d51f2
91 bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
92 c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
95 type StubPutHandler struct {
103 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
104 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
105 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
106 body, err := ioutil.ReadAll(req.Body)
107 this.c.Check(err, Equals, nil)
108 this.c.Check(body, DeepEquals, []byte(this.expectBody))
109 resp.WriteHeader(200)
110 this.handled <- fmt.Sprintf("http://%s", req.Host)
113 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
114 server := http.Server{Handler: st}
117 listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
119 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
122 url = fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
124 go server.Serve(listener)
128 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
129 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
131 listener, url := RunBogusKeepServer(st, 2990)
132 defer listener.Close()
134 kc, _ := MakeKeepClient()
135 kc.ApiToken = "abc123"
137 reader, writer := io.Pipe()
138 upload_status := make(chan uploadStatus)
140 f(kc, url, reader, writer, upload_status)
143 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
144 log.Printf("TestUploadToStubKeepServer")
146 st := StubPutHandler{
148 "acbd18db4cc2f85cedef654fccc4a4d8",
153 UploadToStubHelper(c, st,
154 func(kc KeepClient, url string, reader io.ReadCloser,
155 writer io.WriteCloser, upload_status chan uploadStatus) {
157 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
159 writer.Write([]byte("foo"))
163 status := <-upload_status
164 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
167 log.Printf("TestUploadToStubKeepServer done")
170 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
171 log.Printf("TestUploadToStubKeepServerBufferReader")
173 st := StubPutHandler{
175 "acbd18db4cc2f85cedef654fccc4a4d8",
180 UploadToStubHelper(c, st,
181 func(kc KeepClient, url string, reader io.ReadCloser,
182 writer io.WriteCloser, upload_status chan uploadStatus) {
184 tr := streamer.AsyncStreamFromReader(512, reader)
187 br1 := tr.MakeStreamReader()
189 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
191 writer.Write([]byte("foo"))
196 status := <-upload_status
197 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
200 log.Printf("TestUploadToStubKeepServerBufferReader done")
203 type FailHandler struct {
207 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
208 resp.WriteHeader(500)
209 this.handled <- fmt.Sprintf("http://%s", req.Host)
212 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
213 log.Printf("TestFailedUploadToStubKeepServer")
218 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
220 UploadToStubHelper(c, st,
221 func(kc KeepClient, url string, reader io.ReadCloser,
222 writer io.WriteCloser, upload_status chan uploadStatus) {
224 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
226 writer.Write([]byte("foo"))
231 status := <-upload_status
232 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
233 c.Check(status.statusCode, Equals, 500)
235 log.Printf("TestFailedUploadToStubKeepServer done")
238 type KeepServer struct {
239 listener net.Listener
243 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
244 ks = make([]KeepServer, n)
246 for i := 0; i < n; i += 1 {
247 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
248 ks[i] = KeepServer{boguslistener, bogusurl}
254 func (s *StandaloneSuite) TestPutB(c *C) {
255 log.Printf("TestPutB")
257 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
259 st := StubPutHandler{
264 make(chan string, 2)}
266 kc, _ := MakeKeepClient()
269 kc.ApiToken = "abc123"
270 kc.Service_roots = make([]string, 5)
272 ks := RunSomeFakeKeepServers(st, 5, 2990)
274 for i := 0; i < len(ks); i += 1 {
275 kc.Service_roots[i] = ks[i].url
276 defer ks[i].listener.Close()
279 sort.Strings(kc.Service_roots)
281 kc.PutB([]byte("foo"))
283 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
287 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
288 (s1 == shuff[1] && s2 == shuff[0]),
292 log.Printf("TestPutB done")
295 func (s *StandaloneSuite) TestPutHR(c *C) {
296 log.Printf("TestPutHR")
298 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
300 st := StubPutHandler{
305 make(chan string, 2)}
307 kc, _ := MakeKeepClient()
310 kc.ApiToken = "abc123"
311 kc.Service_roots = make([]string, 5)
313 ks := RunSomeFakeKeepServers(st, 5, 2990)
315 for i := 0; i < len(ks); i += 1 {
316 kc.Service_roots[i] = ks[i].url
317 defer ks[i].listener.Close()
320 sort.Strings(kc.Service_roots)
322 reader, writer := io.Pipe()
325 writer.Write([]byte("foo"))
329 kc.PutHR(hash, reader, 3)
331 shuff := kc.shuffledServiceRoots(hash)
337 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
338 (s1 == shuff[1] && s2 == shuff[0]),
342 log.Printf("TestPutHR done")
345 func (s *StandaloneSuite) TestPutWithFail(c *C) {
346 log.Printf("TestPutWithFail")
348 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
350 st := StubPutHandler{
355 make(chan string, 2)}
358 make(chan string, 1)}
360 kc, _ := MakeKeepClient()
363 kc.ApiToken = "abc123"
364 kc.Service_roots = make([]string, 5)
366 ks1 := RunSomeFakeKeepServers(st, 4, 2990)
367 ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
369 for i, k := range ks1 {
370 kc.Service_roots[i] = k.url
371 defer k.listener.Close()
373 for i, k := range ks2 {
374 kc.Service_roots[len(ks1)+i] = k.url
375 defer k.listener.Close()
378 sort.Strings(kc.Service_roots)
380 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
382 phash, replicas, err := kc.PutB([]byte("foo"))
386 c.Check(err, Equals, nil)
387 c.Check(phash, Equals, hash)
388 c.Check(replicas, Equals, 2)
389 c.Check(<-st.handled, Equals, shuff[1])
390 c.Check(<-st.handled, Equals, shuff[2])
393 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
394 log.Printf("TestPutWithTooManyFail")
396 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
398 st := StubPutHandler{
403 make(chan string, 1)}
406 make(chan string, 4)}
408 kc, _ := MakeKeepClient()
411 kc.ApiToken = "abc123"
412 kc.Service_roots = make([]string, 5)
414 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
415 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
417 for i, k := range ks1 {
418 kc.Service_roots[i] = k.url
419 defer k.listener.Close()
421 for i, k := range ks2 {
422 kc.Service_roots[len(ks1)+i] = k.url
423 defer k.listener.Close()
426 sort.Strings(kc.Service_roots)
428 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
430 _, replicas, err := kc.PutB([]byte("foo"))
432 c.Check(err, Equals, InsufficientReplicasError)
433 c.Check(replicas, Equals, 1)
434 c.Check(<-st.handled, Equals, shuff[1])
436 log.Printf("TestPutWithTooManyFail done")
439 type StubGetHandler struct {
442 expectApiToken string
446 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
447 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
448 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
449 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
450 resp.Write(this.returnBody)
453 func (s *StandaloneSuite) TestGet(c *C) {
454 log.Printf("TestGet")
456 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
458 st := StubGetHandler{
464 listener, url := RunBogusKeepServer(st, 2990)
465 defer listener.Close()
467 kc, _ := MakeKeepClient()
468 kc.ApiToken = "abc123"
469 kc.Service_roots = []string{url}
471 r, n, url2, err := kc.Get(hash)
473 c.Check(err, Equals, nil)
474 c.Check(n, Equals, int64(3))
475 c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
477 content, err2 := ioutil.ReadAll(r)
478 c.Check(err2, Equals, nil)
479 c.Check(content, DeepEquals, []byte("foo"))
481 log.Printf("TestGet done")
484 func (s *StandaloneSuite) TestGetFail(c *C) {
485 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
487 st := FailHandler{make(chan string, 1)}
489 listener, url := RunBogusKeepServer(st, 2990)
490 defer listener.Close()
492 kc, _ := MakeKeepClient()
493 kc.ApiToken = "abc123"
494 kc.Service_roots = []string{url}
496 r, n, url2, err := kc.Get(hash)
497 c.Check(err, Equals, BlockNotFound)
498 c.Check(n, Equals, int64(0))
499 c.Check(url2, Equals, "")
500 c.Check(r, Equals, nil)
503 type BarHandler struct {
507 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
508 resp.Write([]byte("bar"))
509 this.handled <- fmt.Sprintf("http://%s", req.Host)
512 func (s *StandaloneSuite) TestChecksum(c *C) {
513 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
514 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
516 st := BarHandler{make(chan string, 1)}
518 listener, url := RunBogusKeepServer(st, 2990)
519 defer listener.Close()
521 kc, _ := MakeKeepClient()
522 kc.ApiToken = "abc123"
523 kc.Service_roots = []string{url}
525 r, n, _, err := kc.Get(barhash)
526 _, err = ioutil.ReadAll(r)
527 c.Check(n, Equals, int64(3))
528 c.Check(err, Equals, nil)
532 r, n, _, err = kc.Get(foohash)
533 _, err = ioutil.ReadAll(r)
534 c.Check(n, Equals, int64(3))
535 c.Check(err, Equals, BadChecksum)
540 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
542 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
545 make(chan string, 1)}
547 st := StubGetHandler{
553 kc, _ := MakeKeepClient()
554 kc.ApiToken = "abc123"
555 kc.Service_roots = make([]string, 5)
557 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
558 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
560 for i, k := range ks1 {
561 kc.Service_roots[i] = k.url
562 defer k.listener.Close()
564 for i, k := range ks2 {
565 kc.Service_roots[len(ks1)+i] = k.url
566 defer k.listener.Close()
569 sort.Strings(kc.Service_roots)
571 r, n, url2, err := kc.Get(hash)
573 c.Check(err, Equals, nil)
574 c.Check(n, Equals, int64(3))
575 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
577 content, err2 := ioutil.ReadAll(r)
578 c.Check(err2, Equals, nil)
579 c.Check(content, DeepEquals, []byte("foo"))
582 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
583 os.Setenv("ARVADOS_API_HOST", "localhost:3001")
584 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
585 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
587 kc, err := MakeKeepClient()
588 c.Assert(err, Equals, nil)
590 hash, replicas, err := kc.PutB([]byte("foo"))
591 c.Check(hash, Equals, fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
592 c.Check(replicas, Equals, 2)
593 c.Check(err, Equals, nil)
596 r, n, url2, err := kc.Get(hash)
597 c.Check(err, Equals, nil)
598 c.Check(n, Equals, int64(3))
599 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
601 content, err2 := ioutil.ReadAll(r)
602 c.Check(err2, Equals, nil)
603 c.Check(content, DeepEquals, []byte("foo"))
607 n, url2, err := kc.Ask(hash)
608 c.Check(err, Equals, nil)
609 c.Check(n, Equals, int64(3))
610 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
614 type StubProxyHandler struct {
618 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
619 resp.Header().Set("X-Keep-Replicas-Stored", "2")
620 this.handled <- fmt.Sprintf("http://%s", req.Host)
623 func (s *StandaloneSuite) TestPutProxy(c *C) {
624 log.Printf("TestPutProxy")
626 st := StubProxyHandler{make(chan string, 1)}
628 kc, _ := MakeKeepClient()
631 kc.Using_proxy = true
632 kc.ApiToken = "abc123"
633 kc.Service_roots = make([]string, 1)
635 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
637 for i, k := range ks1 {
638 kc.Service_roots[i] = k.url
639 defer k.listener.Close()
642 _, replicas, err := kc.PutB([]byte("foo"))
645 c.Check(err, Equals, nil)
646 c.Check(replicas, Equals, 2)
648 log.Printf("TestPutProxy done")
651 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
652 log.Printf("TestPutProxy")
654 st := StubProxyHandler{make(chan string, 1)}
656 kc, _ := MakeKeepClient()
659 kc.Using_proxy = true
660 kc.ApiToken = "abc123"
661 kc.Service_roots = make([]string, 1)
663 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
665 for i, k := range ks1 {
666 kc.Service_roots[i] = k.url
667 defer k.listener.Close()
670 _, replicas, err := kc.PutB([]byte("foo"))
673 c.Check(err, Equals, InsufficientReplicasError)
674 c.Check(replicas, Equals, 2)
676 log.Printf("TestPutProxy done")