20 // Gocheck boilerplate
21 func Test(t *testing.T) { TestingT(t) }
23 // Gocheck boilerplate
24 var _ = Suite(&ServerRequiredSuite{})
25 var _ = Suite(&StandaloneSuite{})
27 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
29 // Tests that require the Keep server running
30 type ServerRequiredSuite struct{}
33 type StandaloneSuite struct{}
35 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
37 c.Skip("Skipping tests that require server")
39 os.Chdir(os.ExpandEnv("$GOPATH../python"))
40 exec.Command("python", "run_test_server.py", "start").Run()
41 exec.Command("python", "run_test_server.py", "start_keep").Run()
45 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
46 os.Chdir(os.ExpandEnv("$GOPATH../python"))
47 exec.Command("python", "run_test_server.py", "stop_keep").Run()
48 exec.Command("python", "run_test_server.py", "stop").Run()
51 func (s *ServerRequiredSuite) TestInit(c *C) {
52 os.Setenv("ARVADOS_API_HOST", "localhost:3001")
53 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
54 os.Setenv("ARVADOS_API_HOST_INSECURE", "")
56 kc, err := MakeKeepClient()
57 c.Assert(kc.ApiServer, Equals, "localhost:3001")
58 c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
59 c.Assert(kc.ApiInsecure, Equals, false)
61 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
63 kc, err = MakeKeepClient()
64 c.Assert(kc.ApiServer, Equals, "localhost:3001")
65 c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
66 c.Assert(kc.ApiInsecure, Equals, true)
68 c.Assert(err, Equals, nil)
69 c.Assert(len(kc.Service_roots), Equals, 2)
70 c.Assert(kc.Service_roots[0], Equals, "http://localhost:25107")
71 c.Assert(kc.Service_roots[1], Equals, "http://localhost:25108")
74 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
75 kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
77 // "foo" acbd18db4cc2f85cedef654fccc4a4d8
78 foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
79 c.Check(kc.ShuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
81 // "bar" 37b51d194a7513e45b56f6524f2d51f2
82 bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
83 c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
86 func ReadIntoBufferHelper(c *C, bufsize int) {
87 buffer := make([]byte, bufsize)
89 reader, writer := io.Pipe()
90 slices := make(chan ReaderSlice)
92 go ReadIntoBuffer(buffer, reader, slices)
95 out := make([]byte, 128)
96 for i := 0; i < 128; i += 1 {
101 c.Check(len(s1.slice), Equals, 128)
102 c.Check(s1.reader_error, Equals, nil)
103 for i := 0; i < 128; i += 1 {
104 c.Check(s1.slice[i], Equals, byte(i))
106 for i := 0; i < len(buffer); i += 1 {
108 c.Check(buffer[i], Equals, byte(i))
110 c.Check(buffer[i], Equals, byte(0))
115 out := make([]byte, 96)
116 for i := 0; i < 96; i += 1 {
121 c.Check(len(s1.slice), Equals, 96)
122 c.Check(s1.reader_error, Equals, nil)
123 for i := 0; i < 96; i += 1 {
124 c.Check(s1.slice[i], Equals, byte(i/2))
126 for i := 0; i < len(buffer); i += 1 {
128 c.Check(buffer[i], Equals, byte(i))
129 } else if i < (128 + 96) {
130 c.Check(buffer[i], Equals, byte((i-128)/2))
132 c.Check(buffer[i], Equals, byte(0))
139 c.Check(len(s1.slice), Equals, 0)
140 c.Check(s1.reader_error, Equals, io.EOF)
144 func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
145 ReadIntoBufferHelper(c, 512)
146 ReadIntoBufferHelper(c, 225)
147 ReadIntoBufferHelper(c, 224)
150 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
151 buffer := make([]byte, 223)
152 reader, writer := io.Pipe()
153 slices := make(chan ReaderSlice)
155 go ReadIntoBuffer(buffer, reader, slices)
158 out := make([]byte, 128)
159 for i := 0; i < 128; i += 1 {
164 c.Check(len(s1.slice), Equals, 128)
165 c.Check(s1.reader_error, Equals, nil)
166 for i := 0; i < 128; i += 1 {
167 c.Check(s1.slice[i], Equals, byte(i))
169 for i := 0; i < len(buffer); i += 1 {
171 c.Check(buffer[i], Equals, byte(i))
173 c.Check(buffer[i], Equals, byte(0))
178 out := make([]byte, 96)
179 for i := 0; i < 96; i += 1 {
183 // Write will deadlock because it can't write all the data, so
184 // spin it off to a goroutine
188 c.Check(len(s1.slice), Equals, 95)
189 c.Check(s1.reader_error, Equals, nil)
190 for i := 0; i < 95; i += 1 {
191 c.Check(s1.slice[i], Equals, byte(i/2))
193 for i := 0; i < len(buffer); i += 1 {
195 c.Check(buffer[i], Equals, byte(i))
196 } else if i < (128 + 95) {
197 c.Check(buffer[i], Equals, byte((i-128)/2))
199 c.Check(buffer[i], Equals, byte(0))
206 c.Check(len(s1.slice), Equals, 0)
207 c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
212 func (s *StandaloneSuite) TestTransfer(c *C) {
213 reader, writer := io.Pipe()
215 // Buffer for reads from 'r'
216 buffer := make([]byte, 512)
218 // Read requests on Transfer() buffer
219 requests := make(chan ReadRequest)
220 defer close(requests)
222 // Reporting reader error states
223 reader_status := make(chan error)
225 go Transfer(buffer, reader, requests, reader_status)
227 br1 := MakeBufferReader(requests)
228 out := make([]byte, 128)
231 // Write some data, and read into a buffer shorter than
233 for i := 0; i < 128; i += 1 {
237 writer.Write(out[:100])
239 in := make([]byte, 64)
240 n, err := br1.Read(in)
242 c.Check(n, Equals, 64)
243 c.Check(err, Equals, nil)
245 for i := 0; i < 64; i += 1 {
246 c.Check(in[i], Equals, out[i])
251 // Write some more data, and read into buffer longer than
253 in := make([]byte, 64)
254 n, err := br1.Read(in)
255 c.Check(n, Equals, 36)
256 c.Check(err, Equals, nil)
258 for i := 0; i < 36; i += 1 {
259 c.Check(in[i], Equals, out[64+i])
265 // Test read before write
271 in := make([]byte, 64)
274 n, err := br1.Read(in)
278 time.Sleep(100 * time.Millisecond)
279 writer.Write(out[100:])
283 c.Check(got.n, Equals, 28)
284 c.Check(got.err, Equals, nil)
286 for i := 0; i < 28; i += 1 {
287 c.Check(in[i], Equals, out[100+i])
291 br2 := MakeBufferReader(requests)
293 // Test 'catch up' reader
294 in := make([]byte, 256)
295 n, err := br2.Read(in)
297 c.Check(n, Equals, 128)
298 c.Check(err, Equals, nil)
300 for i := 0; i < 128; i += 1 {
301 c.Check(in[i], Equals, out[i])
306 // Test closing the reader
308 status := <-reader_status
309 c.Check(status, Equals, io.EOF)
311 in := make([]byte, 256)
312 n1, err1 := br1.Read(in)
313 n2, err2 := br2.Read(in)
314 c.Check(n1, Equals, 0)
315 c.Check(err1, Equals, io.EOF)
316 c.Check(n2, Equals, 0)
317 c.Check(err2, Equals, io.EOF)
321 // Test 'catch up' reader after closing
322 br3 := MakeBufferReader(requests)
323 in := make([]byte, 256)
324 n, err := br3.Read(in)
326 c.Check(n, Equals, 128)
327 c.Check(err, Equals, nil)
329 for i := 0; i < 128; i += 1 {
330 c.Check(in[i], Equals, out[i])
333 n, err = br3.Read(in)
335 c.Check(n, Equals, 0)
336 c.Check(err, Equals, io.EOF)
340 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
341 reader, writer := io.Pipe()
343 // Buffer for reads from 'r'
344 buffer := make([]byte, 100)
346 // Read requests on Transfer() buffer
347 requests := make(chan ReadRequest)
348 defer close(requests)
350 // Reporting reader error states
351 reader_status := make(chan error)
353 go Transfer(buffer, reader, requests, reader_status)
355 out := make([]byte, 101)
358 status := <-reader_status
359 c.Check(status, Equals, io.ErrShortBuffer)
362 func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
363 // Buffer for reads from 'r'
364 buffer := make([]byte, 100)
365 for i := 0; i < 100; i += 1 {
369 // Read requests on Transfer() buffer
370 requests := make(chan ReadRequest)
371 defer close(requests)
373 go Transfer(buffer, nil, requests, nil)
375 br1 := MakeBufferReader(requests)
377 in := make([]byte, 64)
379 n, err := br1.Read(in)
381 c.Check(n, Equals, 64)
382 c.Check(err, Equals, nil)
384 for i := 0; i < 64; i += 1 {
385 c.Check(in[i], Equals, buffer[i])
389 n, err := br1.Read(in)
391 c.Check(n, Equals, 36)
392 c.Check(err, Equals, nil)
394 for i := 0; i < 36; i += 1 {
395 c.Check(in[i], Equals, buffer[64+i])
399 n, err := br1.Read(in)
401 c.Check(n, Equals, 0)
402 c.Check(err, Equals, io.EOF)
406 func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
407 // Buffer for reads from 'r'
408 buffer := make([]byte, 100)
409 for i := 0; i < 100; i += 1 {
413 // Read requests on Transfer() buffer
414 requests := make(chan ReadRequest)
415 defer close(requests)
417 go Transfer(buffer, nil, requests, nil)
419 br1 := MakeBufferReader(requests)
421 reader, writer := io.Pipe()
424 p := make([]byte, 100)
425 n, err := reader.Read(p)
426 c.Check(n, Equals, 100)
427 c.Check(err, Equals, nil)
428 c.Check(p, DeepEquals, buffer)
434 type StubHandler struct {
437 expectApiToken string
442 func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
443 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
444 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
445 body, err := ioutil.ReadAll(req.Body)
446 this.c.Check(err, Equals, nil)
447 this.c.Check(body, DeepEquals, []byte(this.expectBody))
448 resp.WriteHeader(200)
449 this.handled <- fmt.Sprintf("http://%s", req.Host)
452 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
453 server := http.Server{Handler: st}
456 listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
458 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
461 url = fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
463 go server.Serve(listener)
467 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
468 io.ReadCloser, io.WriteCloser, chan UploadStatus)) {
470 listener, url := RunBogusKeepServer(st, 2990)
471 defer listener.Close()
473 kc, _ := MakeKeepClient()
474 kc.ApiToken = "abc123"
476 reader, writer := io.Pipe()
477 upload_status := make(chan UploadStatus)
479 f(kc, url, reader, writer, upload_status)
482 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
485 "acbd18db4cc2f85cedef654fccc4a4d8",
490 UploadToStubHelper(c, st,
491 func(kc *KeepClient, url string, reader io.ReadCloser,
492 writer io.WriteCloser, upload_status chan UploadStatus) {
494 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
496 writer.Write([]byte("foo"))
500 status := <-upload_status
501 c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
505 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
508 "acbd18db4cc2f85cedef654fccc4a4d8",
513 UploadToStubHelper(c, st,
514 func(kc *KeepClient, url string, reader io.ReadCloser,
515 writer io.WriteCloser, upload_status chan UploadStatus) {
517 // Buffer for reads from 'r'
518 buffer := make([]byte, 512)
520 // Read requests on Transfer() buffer
521 requests := make(chan ReadRequest)
522 defer close(requests)
524 // Reporting reader error states
525 reader_status := make(chan error)
527 go Transfer(buffer, reader, requests, reader_status)
529 br1 := MakeBufferReader(requests)
531 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
533 writer.Write([]byte("foo"))
539 status := <-upload_status
540 c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
542 //c.Check(true, Equals, false)
546 type FailHandler struct {
550 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
551 resp.WriteHeader(400)
552 this.handled <- fmt.Sprintf("http://%s", req.Host)
555 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
559 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
561 UploadToStubHelper(c, st,
562 func(kc *KeepClient, url string, reader io.ReadCloser,
563 writer io.WriteCloser, upload_status chan UploadStatus) {
565 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
567 writer.Write([]byte("foo"))
572 status := <-upload_status
573 c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
574 c.Check(status.StatusCode, Equals, 400)
579 type KeepServer struct {
580 listener net.Listener
584 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
585 ks = make([]KeepServer, n)
587 for i := 0; i < n; i += 1 {
588 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
589 ks[i] = KeepServer{boguslistener, bogusurl}
595 func (s *StandaloneSuite) TestPutB(c *C) {
596 log.Printf("TestPutB")
598 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
605 make(chan string, 2)}
607 kc, _ := MakeKeepClient()
610 kc.ApiToken = "abc123"
611 kc.Service_roots = make([]string, 5)
613 ks := RunSomeFakeKeepServers(st, 5, 2990)
615 for i := 0; i < len(ks); i += 1 {
616 kc.Service_roots[i] = ks[i].url
617 defer ks[i].listener.Close()
620 sort.Strings(kc.Service_roots)
622 kc.PutB([]byte("foo"))
624 shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
626 c.Check(<-st.handled, Equals, shuff[0])
627 c.Check(<-st.handled, Equals, shuff[1])
630 func (s *StandaloneSuite) TestPutHR(c *C) {
631 log.Printf("TestPutHR")
633 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
640 make(chan string, 2)}
642 kc, _ := MakeKeepClient()
645 kc.ApiToken = "abc123"
646 kc.Service_roots = make([]string, 5)
648 ks := RunSomeFakeKeepServers(st, 5, 2990)
650 for i := 0; i < len(ks); i += 1 {
651 kc.Service_roots[i] = ks[i].url
652 defer ks[i].listener.Close()
655 sort.Strings(kc.Service_roots)
657 reader, writer := io.Pipe()
660 writer.Write([]byte("foo"))
664 kc.PutHR(hash, reader, 3)
666 shuff := kc.ShuffledServiceRoots(hash)
668 c.Check(<-st.handled, Equals, shuff[0])
669 c.Check(<-st.handled, Equals, shuff[1])
672 func (s *StandaloneSuite) TestPutWithFail(c *C) {
673 log.Printf("TestPutWithFail")
675 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
682 make(chan string, 2)}
685 make(chan string, 1)}
687 kc, _ := MakeKeepClient()
690 kc.ApiToken = "abc123"
691 kc.Service_roots = make([]string, 5)
693 ks1 := RunSomeFakeKeepServers(st, 4, 2990)
694 ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
696 for i, k := range ks1 {
697 kc.Service_roots[i] = k.url
698 defer k.listener.Close()
700 for i, k := range ks2 {
701 kc.Service_roots[len(ks1)+i] = k.url
702 defer k.listener.Close()
705 sort.Strings(kc.Service_roots)
707 shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
709 err := kc.PutB([]byte("foo"))
713 c.Check(err, Equals, nil)
714 c.Check(<-st.handled, Equals, shuff[1])
715 c.Check(<-st.handled, Equals, shuff[2])
718 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
719 log.Printf("TestPutWithTooManyFail")
721 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
728 make(chan string, 1)}
731 make(chan string, 4)}
733 kc, _ := MakeKeepClient()
736 kc.ApiToken = "abc123"
737 kc.Service_roots = make([]string, 5)
739 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
740 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
742 for i, k := range ks1 {
743 kc.Service_roots[i] = k.url
744 defer k.listener.Close()
746 for i, k := range ks2 {
747 kc.Service_roots[len(ks1)+i] = k.url
748 defer k.listener.Close()
751 sort.Strings(kc.Service_roots)
753 shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
755 err := kc.PutB([]byte("foo"))
757 c.Check(err, Equals, InsufficientReplicasError)
758 c.Check(<-st.handled, Equals, shuff[1])