21 // Gocheck boilerplate
22 func Test(t *testing.T) { TestingT(t) }
24 // Gocheck boilerplate
25 var _ = Suite(&ServerRequiredSuite{})
26 var _ = Suite(&StandaloneSuite{})
28 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
30 // Tests that require the Keep server running
31 type ServerRequiredSuite struct{}
34 type StandaloneSuite struct{}
36 func pythonDir() string {
37 gopath := os.Getenv("GOPATH")
38 return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
41 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
43 c.Skip("Skipping tests that require server")
46 exec.Command("python", "run_test_server.py", "start").Run()
47 exec.Command("python", "run_test_server.py", "start_keep").Run()
51 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
53 exec.Command("python", "run_test_server.py", "stop_keep").Run()
54 exec.Command("python", "run_test_server.py", "stop").Run()
57 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
58 os.Setenv("ARVADOS_API_HOST", "localhost:3001")
59 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
60 os.Setenv("ARVADOS_API_HOST_INSECURE", "")
62 kc, err := MakeKeepClient()
63 c.Assert(kc.ApiServer, Equals, "localhost:3001")
64 c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
65 c.Assert(kc.ApiInsecure, Equals, false)
67 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
69 kc, err = MakeKeepClient()
70 c.Assert(kc.ApiServer, Equals, "localhost:3001")
71 c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
72 c.Assert(kc.ApiInsecure, Equals, true)
74 c.Assert(err, Equals, nil)
75 c.Assert(len(kc.Service_roots), Equals, 2)
76 c.Assert(kc.Service_roots[0], Equals, "http://localhost:25107")
77 c.Assert(kc.Service_roots[1], Equals, "http://localhost:25108")
80 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
81 kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
83 // "foo" acbd18db4cc2f85cedef654fccc4a4d8
84 foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
85 c.Check(kc.ShuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
87 // "bar" 37b51d194a7513e45b56f6524f2d51f2
88 bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
89 c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
92 type StubPutHandler struct {
100 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
101 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
102 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
103 body, err := ioutil.ReadAll(req.Body)
104 this.c.Check(err, Equals, nil)
105 this.c.Check(body, DeepEquals, []byte(this.expectBody))
106 resp.WriteHeader(200)
107 this.handled <- fmt.Sprintf("http://%s", req.Host)
110 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
111 server := http.Server{Handler: st}
114 listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
116 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
119 url = fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
121 go server.Serve(listener)
125 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
126 io.ReadCloser, io.WriteCloser, chan UploadStatus)) {
128 listener, url := RunBogusKeepServer(st, 2990)
129 defer listener.Close()
131 kc, _ := MakeKeepClient()
132 kc.ApiToken = "abc123"
134 reader, writer := io.Pipe()
135 upload_status := make(chan UploadStatus)
137 f(kc, url, reader, writer, upload_status)
140 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
141 st := StubPutHandler{
143 "acbd18db4cc2f85cedef654fccc4a4d8",
148 UploadToStubHelper(c, st,
149 func(kc KeepClient, url string, reader io.ReadCloser,
150 writer io.WriteCloser, upload_status chan UploadStatus) {
152 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
154 writer.Write([]byte("foo"))
158 status := <-upload_status
159 c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
163 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
164 st := StubPutHandler{
166 "acbd18db4cc2f85cedef654fccc4a4d8",
171 UploadToStubHelper(c, st,
172 func(kc KeepClient, url string, reader io.ReadCloser,
173 writer io.WriteCloser, upload_status chan UploadStatus) {
175 // Buffer for reads from 'r'
176 buf := make([]byte, 512)
178 // Read requests on Transfer() buffer
179 requests := make(chan buffer.ReadRequest)
180 defer close(requests)
182 // Reporting reader error states
183 reader_status := make(chan error)
185 go buffer.Transfer(buf, reader, requests, reader_status)
187 br1 := buffer.MakeBufferReader(requests)
189 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
191 writer.Write([]byte("foo"))
197 status := <-upload_status
198 c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
200 //c.Check(true, Equals, false)
204 type FailHandler struct {
208 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
209 resp.WriteHeader(400)
210 this.handled <- fmt.Sprintf("http://%s", req.Host)
213 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
217 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
219 UploadToStubHelper(c, st,
220 func(kc KeepClient, url string, reader io.ReadCloser,
221 writer io.WriteCloser, upload_status chan UploadStatus) {
223 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
225 writer.Write([]byte("foo"))
230 status := <-upload_status
231 c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
232 c.Check(status.StatusCode, Equals, 400)
237 type KeepServer struct {
238 listener net.Listener
242 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
243 ks = make([]KeepServer, n)
245 for i := 0; i < n; i += 1 {
246 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
247 ks[i] = KeepServer{boguslistener, bogusurl}
253 func (s *StandaloneSuite) TestPutB(c *C) {
254 log.Printf("TestPutB")
256 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
258 st := StubPutHandler{
263 make(chan string, 2)}
265 kc, _ := MakeKeepClient()
268 kc.ApiToken = "abc123"
269 kc.Service_roots = make([]string, 5)
271 ks := RunSomeFakeKeepServers(st, 5, 2990)
273 for i := 0; i < len(ks); i += 1 {
274 kc.Service_roots[i] = ks[i].url
275 defer ks[i].listener.Close()
278 sort.Strings(kc.Service_roots)
280 kc.PutB([]byte("foo"))
282 shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
284 c.Check(<-st.handled, Equals, shuff[0])
285 c.Check(<-st.handled, Equals, shuff[1])
288 func (s *StandaloneSuite) TestPutHR(c *C) {
289 log.Printf("TestPutHR")
291 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
293 st := StubPutHandler{
298 make(chan string, 2)}
300 kc, _ := MakeKeepClient()
303 kc.ApiToken = "abc123"
304 kc.Service_roots = make([]string, 5)
306 ks := RunSomeFakeKeepServers(st, 5, 2990)
308 for i := 0; i < len(ks); i += 1 {
309 kc.Service_roots[i] = ks[i].url
310 defer ks[i].listener.Close()
313 sort.Strings(kc.Service_roots)
315 reader, writer := io.Pipe()
318 writer.Write([]byte("foo"))
322 kc.PutHR(hash, reader, 3)
324 shuff := kc.ShuffledServiceRoots(hash)
330 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
331 (s1 == shuff[1] && s2 == shuff[0]),
336 func (s *StandaloneSuite) TestPutWithFail(c *C) {
337 log.Printf("TestPutWithFail")
339 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
341 st := StubPutHandler{
346 make(chan string, 2)}
349 make(chan string, 1)}
351 kc, _ := MakeKeepClient()
354 kc.ApiToken = "abc123"
355 kc.Service_roots = make([]string, 5)
357 ks1 := RunSomeFakeKeepServers(st, 4, 2990)
358 ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
360 for i, k := range ks1 {
361 kc.Service_roots[i] = k.url
362 defer k.listener.Close()
364 for i, k := range ks2 {
365 kc.Service_roots[len(ks1)+i] = k.url
366 defer k.listener.Close()
369 sort.Strings(kc.Service_roots)
371 shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
373 phash, replicas, err := kc.PutB([]byte("foo"))
377 c.Check(err, Equals, nil)
378 c.Check(phash, Equals, hash)
379 c.Check(replicas, Equals, 2)
380 c.Check(<-st.handled, Equals, shuff[1])
381 c.Check(<-st.handled, Equals, shuff[2])
384 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
385 log.Printf("TestPutWithTooManyFail")
387 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
389 st := StubPutHandler{
394 make(chan string, 1)}
397 make(chan string, 4)}
399 kc, _ := MakeKeepClient()
402 kc.ApiToken = "abc123"
403 kc.Service_roots = make([]string, 5)
405 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
406 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
408 for i, k := range ks1 {
409 kc.Service_roots[i] = k.url
410 defer k.listener.Close()
412 for i, k := range ks2 {
413 kc.Service_roots[len(ks1)+i] = k.url
414 defer k.listener.Close()
417 sort.Strings(kc.Service_roots)
419 shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
421 _, replicas, err := kc.PutB([]byte("foo"))
423 c.Check(err, Equals, InsufficientReplicasError)
424 c.Check(replicas, Equals, 1)
425 c.Check(<-st.handled, Equals, shuff[1])
428 type StubGetHandler struct {
431 expectApiToken string
435 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
436 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
437 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
438 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
439 resp.Write(this.returnBody)
442 func (s *StandaloneSuite) TestGet(c *C) {
444 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
446 st := StubGetHandler{
452 listener, url := RunBogusKeepServer(st, 2990)
453 defer listener.Close()
455 kc, _ := MakeKeepClient()
456 kc.ApiToken = "abc123"
457 kc.Service_roots = []string{url}
459 r, n, url2, err := kc.Get(hash)
460 c.Check(err, Equals, nil)
461 c.Check(n, Equals, int64(3))
462 c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
464 content, err2 := ioutil.ReadAll(r)
465 c.Check(err2, Equals, nil)
466 c.Check(content, DeepEquals, []byte("foo"))
469 func (s *StandaloneSuite) TestGetFail(c *C) {
470 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
472 st := FailHandler{make(chan string, 1)}
474 listener, url := RunBogusKeepServer(st, 2990)
475 defer listener.Close()
477 kc, _ := MakeKeepClient()
478 kc.ApiToken = "abc123"
479 kc.Service_roots = []string{url}
481 r, n, url2, err := kc.Get(hash)
482 c.Check(err, Equals, BlockNotFound)
483 c.Check(n, Equals, int64(0))
484 c.Check(url2, Equals, "")
485 c.Check(r, Equals, nil)
488 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
490 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
493 make(chan string, 1)}
495 st := StubGetHandler{
501 kc, _ := MakeKeepClient()
502 kc.ApiToken = "abc123"
503 kc.Service_roots = make([]string, 5)
505 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
506 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
508 for i, k := range ks1 {
509 kc.Service_roots[i] = k.url
510 defer k.listener.Close()
512 for i, k := range ks2 {
513 kc.Service_roots[len(ks1)+i] = k.url
514 defer k.listener.Close()
517 sort.Strings(kc.Service_roots)
519 r, n, url2, err := kc.Get(hash)
521 c.Check(err, Equals, nil)
522 c.Check(n, Equals, int64(3))
523 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
525 content, err2 := ioutil.ReadAll(r)
526 c.Check(err2, Equals, nil)
527 c.Check(content, DeepEquals, []byte("foo"))
530 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
531 os.Setenv("ARVADOS_API_HOST", "localhost:3001")
532 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
533 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
535 kc, err := MakeKeepClient()
536 c.Assert(err, Equals, nil)
538 hash, replicas, err := kc.PutB([]byte("foo"))
539 c.Check(hash, Equals, fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
540 c.Check(replicas, Equals, 2)
541 c.Check(err, Equals, nil)
544 r, n, url2, err := kc.Get(hash)
545 c.Check(err, Equals, nil)
546 c.Check(n, Equals, int64(3))
547 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
549 content, err2 := ioutil.ReadAll(r)
550 c.Check(err2, Equals, nil)
551 c.Check(content, DeepEquals, []byte("foo"))
555 n, url2, err := kc.Ask(hash)
556 c.Check(err, Equals, nil)
557 c.Check(n, Equals, int64(3))
558 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))