1885: Renamed DiscoverKeepDisks to DiscoverKeepServers, moved error.New()
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "crypto/md5"
5         "flag"
6         "fmt"
7         . "gopkg.in/check.v1"
8         "io"
9         "io/ioutil"
10         "log"
11         "net"
12         "net/http"
13         "os"
14         "os/exec"
15         "sort"
16         "strings"
17         "testing"
18         "time"
19 )
20
21 // Gocheck boilerplate
22 func Test(t *testing.T) { TestingT(t) }
23
24 // Gocheck boilerplate
25 var _ = Suite(&ServerRequiredSuite{})
26 var _ = Suite(&StandaloneSuite{})
27
28 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
29
30 // Tests that require the Keep server running
31 type ServerRequiredSuite struct{}
32
33 // Standalone tests
34 type StandaloneSuite struct{}
35
36 func pythonDir() string {
37         gopath := os.Getenv("GOPATH")
38         return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
39 }
40
41 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
42         if *no_server {
43                 c.Skip("Skipping tests that require server")
44         } else {
45                 os.Chdir(pythonDir())
46                 exec.Command("python", "run_test_server.py", "start").Run()
47                 exec.Command("python", "run_test_server.py", "start_keep").Run()
48         }
49 }
50
51 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
52         os.Chdir(pythonDir())
53         exec.Command("python", "run_test_server.py", "stop_keep").Run()
54         exec.Command("python", "run_test_server.py", "stop").Run()
55 }
56
57 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
58         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
59         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
60         os.Setenv("ARVADOS_API_HOST_INSECURE", "")
61
62         kc, err := MakeKeepClient()
63         c.Assert(kc.ApiServer, Equals, "localhost:3001")
64         c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
65         c.Assert(kc.ApiInsecure, Equals, false)
66
67         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
68
69         kc, err = MakeKeepClient()
70         c.Assert(kc.ApiServer, Equals, "localhost:3001")
71         c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
72         c.Assert(kc.ApiInsecure, Equals, true)
73
74         c.Assert(err, Equals, nil)
75         c.Assert(len(kc.Service_roots), Equals, 2)
76         c.Assert(kc.Service_roots[0], Equals, "http://localhost:25107")
77         c.Assert(kc.Service_roots[1], Equals, "http://localhost:25108")
78 }
79
80 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
81         kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
82
83         // "foo" acbd18db4cc2f85cedef654fccc4a4d8
84         foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
85         c.Check(kc.ShuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
86
87         // "bar" 37b51d194a7513e45b56f6524f2d51f2
88         bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
89         c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
90 }
91
92 func ReadIntoBufferHelper(c *C, bufsize int) {
93         buffer := make([]byte, bufsize)
94
95         reader, writer := io.Pipe()
96         slices := make(chan ReaderSlice)
97
98         go ReadIntoBuffer(buffer, reader, slices)
99
100         {
101                 out := make([]byte, 128)
102                 for i := 0; i < 128; i += 1 {
103                         out[i] = byte(i)
104                 }
105                 writer.Write(out)
106                 s1 := <-slices
107                 c.Check(len(s1.slice), Equals, 128)
108                 c.Check(s1.reader_error, Equals, nil)
109                 for i := 0; i < 128; i += 1 {
110                         c.Check(s1.slice[i], Equals, byte(i))
111                 }
112                 for i := 0; i < len(buffer); i += 1 {
113                         if i < 128 {
114                                 c.Check(buffer[i], Equals, byte(i))
115                         } else {
116                                 c.Check(buffer[i], Equals, byte(0))
117                         }
118                 }
119         }
120         {
121                 out := make([]byte, 96)
122                 for i := 0; i < 96; i += 1 {
123                         out[i] = byte(i / 2)
124                 }
125                 writer.Write(out)
126                 s1 := <-slices
127                 c.Check(len(s1.slice), Equals, 96)
128                 c.Check(s1.reader_error, Equals, nil)
129                 for i := 0; i < 96; i += 1 {
130                         c.Check(s1.slice[i], Equals, byte(i/2))
131                 }
132                 for i := 0; i < len(buffer); i += 1 {
133                         if i < 128 {
134                                 c.Check(buffer[i], Equals, byte(i))
135                         } else if i < (128 + 96) {
136                                 c.Check(buffer[i], Equals, byte((i-128)/2))
137                         } else {
138                                 c.Check(buffer[i], Equals, byte(0))
139                         }
140                 }
141         }
142         {
143                 writer.Close()
144                 s1 := <-slices
145                 c.Check(len(s1.slice), Equals, 0)
146                 c.Check(s1.reader_error, Equals, io.EOF)
147         }
148 }
149
150 func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
151         ReadIntoBufferHelper(c, 512)
152         ReadIntoBufferHelper(c, 225)
153         ReadIntoBufferHelper(c, 224)
154 }
155
156 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
157         buffer := make([]byte, 223)
158         reader, writer := io.Pipe()
159         slices := make(chan ReaderSlice)
160
161         go ReadIntoBuffer(buffer, reader, slices)
162
163         {
164                 out := make([]byte, 128)
165                 for i := 0; i < 128; i += 1 {
166                         out[i] = byte(i)
167                 }
168                 writer.Write(out)
169                 s1 := <-slices
170                 c.Check(len(s1.slice), Equals, 128)
171                 c.Check(s1.reader_error, Equals, nil)
172                 for i := 0; i < 128; i += 1 {
173                         c.Check(s1.slice[i], Equals, byte(i))
174                 }
175                 for i := 0; i < len(buffer); i += 1 {
176                         if i < 128 {
177                                 c.Check(buffer[i], Equals, byte(i))
178                         } else {
179                                 c.Check(buffer[i], Equals, byte(0))
180                         }
181                 }
182         }
183         {
184                 out := make([]byte, 96)
185                 for i := 0; i < 96; i += 1 {
186                         out[i] = byte(i / 2)
187                 }
188
189                 // Write will deadlock because it can't write all the data, so
190                 // spin it off to a goroutine
191                 go writer.Write(out)
192                 s1 := <-slices
193
194                 c.Check(len(s1.slice), Equals, 95)
195                 c.Check(s1.reader_error, Equals, nil)
196                 for i := 0; i < 95; i += 1 {
197                         c.Check(s1.slice[i], Equals, byte(i/2))
198                 }
199                 for i := 0; i < len(buffer); i += 1 {
200                         if i < 128 {
201                                 c.Check(buffer[i], Equals, byte(i))
202                         } else if i < (128 + 95) {
203                                 c.Check(buffer[i], Equals, byte((i-128)/2))
204                         } else {
205                                 c.Check(buffer[i], Equals, byte(0))
206                         }
207                 }
208         }
209         {
210                 writer.Close()
211                 s1 := <-slices
212                 c.Check(len(s1.slice), Equals, 0)
213                 c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
214         }
215
216 }
217
218 func (s *StandaloneSuite) TestTransfer(c *C) {
219         reader, writer := io.Pipe()
220
221         // Buffer for reads from 'r'
222         buffer := make([]byte, 512)
223
224         // Read requests on Transfer() buffer
225         requests := make(chan ReadRequest)
226         defer close(requests)
227
228         // Reporting reader error states
229         reader_status := make(chan error)
230
231         go Transfer(buffer, reader, requests, reader_status)
232
233         br1 := MakeBufferReader(requests)
234         out := make([]byte, 128)
235
236         {
237                 // Write some data, and read into a buffer shorter than
238                 // available data
239                 for i := 0; i < 128; i += 1 {
240                         out[i] = byte(i)
241                 }
242
243                 writer.Write(out[:100])
244
245                 in := make([]byte, 64)
246                 n, err := br1.Read(in)
247
248                 c.Check(n, Equals, 64)
249                 c.Check(err, Equals, nil)
250
251                 for i := 0; i < 64; i += 1 {
252                         c.Check(in[i], Equals, out[i])
253                 }
254         }
255
256         {
257                 // Write some more data, and read into buffer longer than
258                 // available data
259                 in := make([]byte, 64)
260                 n, err := br1.Read(in)
261                 c.Check(n, Equals, 36)
262                 c.Check(err, Equals, nil)
263
264                 for i := 0; i < 36; i += 1 {
265                         c.Check(in[i], Equals, out[64+i])
266                 }
267
268         }
269
270         {
271                 // Test read before write
272                 type Rd struct {
273                         n   int
274                         err error
275                 }
276                 rd := make(chan Rd)
277                 in := make([]byte, 64)
278
279                 go func() {
280                         n, err := br1.Read(in)
281                         rd <- Rd{n, err}
282                 }()
283
284                 time.Sleep(100 * time.Millisecond)
285                 writer.Write(out[100:])
286
287                 got := <-rd
288
289                 c.Check(got.n, Equals, 28)
290                 c.Check(got.err, Equals, nil)
291
292                 for i := 0; i < 28; i += 1 {
293                         c.Check(in[i], Equals, out[100+i])
294                 }
295         }
296
297         br2 := MakeBufferReader(requests)
298         {
299                 // Test 'catch up' reader
300                 in := make([]byte, 256)
301                 n, err := br2.Read(in)
302
303                 c.Check(n, Equals, 128)
304                 c.Check(err, Equals, nil)
305
306                 for i := 0; i < 128; i += 1 {
307                         c.Check(in[i], Equals, out[i])
308                 }
309         }
310
311         {
312                 // Test closing the reader
313                 writer.Close()
314                 status := <-reader_status
315                 c.Check(status, Equals, io.EOF)
316
317                 in := make([]byte, 256)
318                 n1, err1 := br1.Read(in)
319                 n2, err2 := br2.Read(in)
320                 c.Check(n1, Equals, 0)
321                 c.Check(err1, Equals, io.EOF)
322                 c.Check(n2, Equals, 0)
323                 c.Check(err2, Equals, io.EOF)
324         }
325
326         {
327                 // Test 'catch up' reader after closing
328                 br3 := MakeBufferReader(requests)
329                 in := make([]byte, 256)
330                 n, err := br3.Read(in)
331
332                 c.Check(n, Equals, 128)
333                 c.Check(err, Equals, nil)
334
335                 for i := 0; i < 128; i += 1 {
336                         c.Check(in[i], Equals, out[i])
337                 }
338
339                 n, err = br3.Read(in)
340
341                 c.Check(n, Equals, 0)
342                 c.Check(err, Equals, io.EOF)
343         }
344 }
345
346 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
347         reader, writer := io.Pipe()
348
349         // Buffer for reads from 'r'
350         buffer := make([]byte, 100)
351
352         // Read requests on Transfer() buffer
353         requests := make(chan ReadRequest)
354         defer close(requests)
355
356         // Reporting reader error states
357         reader_status := make(chan error)
358
359         go Transfer(buffer, reader, requests, reader_status)
360
361         out := make([]byte, 101)
362         go writer.Write(out)
363
364         status := <-reader_status
365         c.Check(status, Equals, io.ErrShortBuffer)
366 }
367
368 func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
369         // Buffer for reads from 'r'
370         buffer := make([]byte, 100)
371         for i := 0; i < 100; i += 1 {
372                 buffer[i] = byte(i)
373         }
374
375         // Read requests on Transfer() buffer
376         requests := make(chan ReadRequest)
377         defer close(requests)
378
379         go Transfer(buffer, nil, requests, nil)
380
381         br1 := MakeBufferReader(requests)
382
383         in := make([]byte, 64)
384         {
385                 n, err := br1.Read(in)
386
387                 c.Check(n, Equals, 64)
388                 c.Check(err, Equals, nil)
389
390                 for i := 0; i < 64; i += 1 {
391                         c.Check(in[i], Equals, buffer[i])
392                 }
393         }
394         {
395                 n, err := br1.Read(in)
396
397                 c.Check(n, Equals, 36)
398                 c.Check(err, Equals, nil)
399
400                 for i := 0; i < 36; i += 1 {
401                         c.Check(in[i], Equals, buffer[64+i])
402                 }
403         }
404         {
405                 n, err := br1.Read(in)
406
407                 c.Check(n, Equals, 0)
408                 c.Check(err, Equals, io.EOF)
409         }
410 }
411
412 func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
413         // Buffer for reads from 'r'
414         buffer := make([]byte, 100)
415         for i := 0; i < 100; i += 1 {
416                 buffer[i] = byte(i)
417         }
418
419         // Read requests on Transfer() buffer
420         requests := make(chan ReadRequest)
421         defer close(requests)
422
423         go Transfer(buffer, nil, requests, nil)
424
425         br1 := MakeBufferReader(requests)
426
427         reader, writer := io.Pipe()
428
429         go func() {
430                 p := make([]byte, 100)
431                 n, err := reader.Read(p)
432                 c.Check(n, Equals, 100)
433                 c.Check(err, Equals, nil)
434                 c.Check(p, DeepEquals, buffer)
435         }()
436
437         io.Copy(writer, br1)
438 }
439
440 type StubPutHandler struct {
441         c              *C
442         expectPath     string
443         expectApiToken string
444         expectBody     string
445         handled        chan string
446 }
447
448 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
449         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
450         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
451         body, err := ioutil.ReadAll(req.Body)
452         this.c.Check(err, Equals, nil)
453         this.c.Check(body, DeepEquals, []byte(this.expectBody))
454         resp.WriteHeader(200)
455         this.handled <- fmt.Sprintf("http://%s", req.Host)
456 }
457
458 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
459         server := http.Server{Handler: st}
460
461         var err error
462         listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
463         if err != nil {
464                 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
465         }
466
467         url = fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
468
469         go server.Serve(listener)
470         return listener, url
471 }
472
473 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
474         io.ReadCloser, io.WriteCloser, chan UploadStatus)) {
475
476         listener, url := RunBogusKeepServer(st, 2990)
477         defer listener.Close()
478
479         kc, _ := MakeKeepClient()
480         kc.ApiToken = "abc123"
481
482         reader, writer := io.Pipe()
483         upload_status := make(chan UploadStatus)
484
485         f(kc, url, reader, writer, upload_status)
486 }
487
488 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
489         st := StubPutHandler{
490                 c,
491                 "acbd18db4cc2f85cedef654fccc4a4d8",
492                 "abc123",
493                 "foo",
494                 make(chan string)}
495
496         UploadToStubHelper(c, st,
497                 func(kc KeepClient, url string, reader io.ReadCloser,
498                         writer io.WriteCloser, upload_status chan UploadStatus) {
499
500                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
501
502                         writer.Write([]byte("foo"))
503                         writer.Close()
504
505                         <-st.handled
506                         status := <-upload_status
507                         c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
508                 })
509 }
510
511 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
512         st := StubPutHandler{
513                 c,
514                 "acbd18db4cc2f85cedef654fccc4a4d8",
515                 "abc123",
516                 "foo",
517                 make(chan string)}
518
519         UploadToStubHelper(c, st,
520                 func(kc KeepClient, url string, reader io.ReadCloser,
521                         writer io.WriteCloser, upload_status chan UploadStatus) {
522
523                         // Buffer for reads from 'r'
524                         buffer := make([]byte, 512)
525
526                         // Read requests on Transfer() buffer
527                         requests := make(chan ReadRequest)
528                         defer close(requests)
529
530                         // Reporting reader error states
531                         reader_status := make(chan error)
532
533                         go Transfer(buffer, reader, requests, reader_status)
534
535                         br1 := MakeBufferReader(requests)
536
537                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
538
539                         writer.Write([]byte("foo"))
540                         writer.Close()
541
542                         <-reader_status
543                         <-st.handled
544
545                         status := <-upload_status
546                         c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
547
548                         //c.Check(true, Equals, false)
549                 })
550 }
551
552 type FailHandler struct {
553         handled chan string
554 }
555
556 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
557         resp.WriteHeader(400)
558         this.handled <- fmt.Sprintf("http://%s", req.Host)
559 }
560
561 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
562         st := FailHandler{
563                 make(chan string)}
564
565         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
566
567         UploadToStubHelper(c, st,
568                 func(kc KeepClient, url string, reader io.ReadCloser,
569                         writer io.WriteCloser, upload_status chan UploadStatus) {
570
571                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
572
573                         writer.Write([]byte("foo"))
574                         writer.Close()
575
576                         <-st.handled
577
578                         status := <-upload_status
579                         c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
580                         c.Check(status.StatusCode, Equals, 400)
581                 })
582
583 }
584
585 type KeepServer struct {
586         listener net.Listener
587         url      string
588 }
589
590 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
591         ks = make([]KeepServer, n)
592
593         for i := 0; i < n; i += 1 {
594                 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
595                 ks[i] = KeepServer{boguslistener, bogusurl}
596         }
597
598         return ks
599 }
600
601 func (s *StandaloneSuite) TestPutB(c *C) {
602         log.Printf("TestPutB")
603
604         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
605
606         st := StubPutHandler{
607                 c,
608                 hash,
609                 "abc123",
610                 "foo",
611                 make(chan string, 2)}
612
613         kc, _ := MakeKeepClient()
614
615         kc.Want_replicas = 2
616         kc.ApiToken = "abc123"
617         kc.Service_roots = make([]string, 5)
618
619         ks := RunSomeFakeKeepServers(st, 5, 2990)
620
621         for i := 0; i < len(ks); i += 1 {
622                 kc.Service_roots[i] = ks[i].url
623                 defer ks[i].listener.Close()
624         }
625
626         sort.Strings(kc.Service_roots)
627
628         kc.PutB([]byte("foo"))
629
630         shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
631
632         c.Check(<-st.handled, Equals, shuff[0])
633         c.Check(<-st.handled, Equals, shuff[1])
634 }
635
636 func (s *StandaloneSuite) TestPutHR(c *C) {
637         log.Printf("TestPutHR")
638
639         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
640
641         st := StubPutHandler{
642                 c,
643                 hash,
644                 "abc123",
645                 "foo",
646                 make(chan string, 2)}
647
648         kc, _ := MakeKeepClient()
649
650         kc.Want_replicas = 2
651         kc.ApiToken = "abc123"
652         kc.Service_roots = make([]string, 5)
653
654         ks := RunSomeFakeKeepServers(st, 5, 2990)
655
656         for i := 0; i < len(ks); i += 1 {
657                 kc.Service_roots[i] = ks[i].url
658                 defer ks[i].listener.Close()
659         }
660
661         sort.Strings(kc.Service_roots)
662
663         reader, writer := io.Pipe()
664
665         go func() {
666                 writer.Write([]byte("foo"))
667                 writer.Close()
668         }()
669
670         kc.PutHR(hash, reader, 3)
671
672         shuff := kc.ShuffledServiceRoots(hash)
673         log.Print(shuff)
674
675         s1 := <-st.handled
676         s2 := <-st.handled
677
678         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
679                 (s1 == shuff[1] && s2 == shuff[0]),
680                 Equals,
681                 true)
682 }
683
684 func (s *StandaloneSuite) TestPutWithFail(c *C) {
685         log.Printf("TestPutWithFail")
686
687         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
688
689         st := StubPutHandler{
690                 c,
691                 hash,
692                 "abc123",
693                 "foo",
694                 make(chan string, 2)}
695
696         fh := FailHandler{
697                 make(chan string, 1)}
698
699         kc, _ := MakeKeepClient()
700
701         kc.Want_replicas = 2
702         kc.ApiToken = "abc123"
703         kc.Service_roots = make([]string, 5)
704
705         ks1 := RunSomeFakeKeepServers(st, 4, 2990)
706         ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
707
708         for i, k := range ks1 {
709                 kc.Service_roots[i] = k.url
710                 defer k.listener.Close()
711         }
712         for i, k := range ks2 {
713                 kc.Service_roots[len(ks1)+i] = k.url
714                 defer k.listener.Close()
715         }
716
717         sort.Strings(kc.Service_roots)
718
719         shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
720
721         phash, replicas, err := kc.PutB([]byte("foo"))
722
723         <-fh.handled
724
725         c.Check(err, Equals, nil)
726         c.Check(phash, Equals, hash)
727         c.Check(replicas, Equals, 2)
728         c.Check(<-st.handled, Equals, shuff[1])
729         c.Check(<-st.handled, Equals, shuff[2])
730 }
731
732 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
733         log.Printf("TestPutWithTooManyFail")
734
735         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
736
737         st := StubPutHandler{
738                 c,
739                 hash,
740                 "abc123",
741                 "foo",
742                 make(chan string, 1)}
743
744         fh := FailHandler{
745                 make(chan string, 4)}
746
747         kc, _ := MakeKeepClient()
748
749         kc.Want_replicas = 2
750         kc.ApiToken = "abc123"
751         kc.Service_roots = make([]string, 5)
752
753         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
754         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
755
756         for i, k := range ks1 {
757                 kc.Service_roots[i] = k.url
758                 defer k.listener.Close()
759         }
760         for i, k := range ks2 {
761                 kc.Service_roots[len(ks1)+i] = k.url
762                 defer k.listener.Close()
763         }
764
765         sort.Strings(kc.Service_roots)
766
767         shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
768
769         _, replicas, err := kc.PutB([]byte("foo"))
770
771         c.Check(err, Equals, InsufficientReplicasError)
772         c.Check(replicas, Equals, 1)
773         c.Check(<-st.handled, Equals, shuff[1])
774 }
775
776 type StubGetHandler struct {
777         c              *C
778         expectPath     string
779         expectApiToken string
780         returnBody     []byte
781 }
782
783 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
784         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
785         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
786         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
787         resp.Write(this.returnBody)
788 }
789
790 func (s *StandaloneSuite) TestGet(c *C) {
791
792         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
793
794         st := StubGetHandler{
795                 c,
796                 hash,
797                 "abc123",
798                 []byte("foo")}
799
800         listener, url := RunBogusKeepServer(st, 2990)
801         defer listener.Close()
802
803         kc, _ := MakeKeepClient()
804         kc.ApiToken = "abc123"
805         kc.Service_roots = []string{url}
806
807         r, n, url2, err := kc.Get(hash)
808         c.Check(err, Equals, nil)
809         c.Check(n, Equals, int64(3))
810         c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
811
812         content, err2 := ioutil.ReadAll(r)
813         c.Check(err2, Equals, nil)
814         c.Check(content, DeepEquals, []byte("foo"))
815 }
816
817 func (s *StandaloneSuite) TestGetFail(c *C) {
818         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
819
820         st := FailHandler{make(chan string, 1)}
821
822         listener, url := RunBogusKeepServer(st, 2990)
823         defer listener.Close()
824
825         kc, _ := MakeKeepClient()
826         kc.ApiToken = "abc123"
827         kc.Service_roots = []string{url}
828
829         r, n, url2, err := kc.Get(hash)
830         c.Check(err, Equals, BlockNotFound)
831         c.Check(n, Equals, int64(0))
832         c.Check(url2, Equals, "")
833         c.Check(r, Equals, nil)
834 }
835
836 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
837
838         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
839
840         fh := FailHandler{
841                 make(chan string, 1)}
842
843         st := StubGetHandler{
844                 c,
845                 hash,
846                 "abc123",
847                 []byte("foo")}
848
849         kc, _ := MakeKeepClient()
850         kc.ApiToken = "abc123"
851         kc.Service_roots = make([]string, 5)
852
853         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
854         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
855
856         for i, k := range ks1 {
857                 kc.Service_roots[i] = k.url
858                 defer k.listener.Close()
859         }
860         for i, k := range ks2 {
861                 kc.Service_roots[len(ks1)+i] = k.url
862                 defer k.listener.Close()
863         }
864
865         sort.Strings(kc.Service_roots)
866
867         r, n, url2, err := kc.Get(hash)
868         <-fh.handled
869         c.Check(err, Equals, nil)
870         c.Check(n, Equals, int64(3))
871         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
872
873         content, err2 := ioutil.ReadAll(r)
874         c.Check(err2, Equals, nil)
875         c.Check(content, DeepEquals, []byte("foo"))
876 }
877
878 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
879         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
880         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
881         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
882
883         kc, err := MakeKeepClient()
884         c.Assert(err, Equals, nil)
885
886         hash, replicas, err := kc.PutB([]byte("foo"))
887         c.Check(hash, Equals, fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
888         c.Check(replicas, Equals, 2)
889         c.Check(err, Equals, nil)
890
891         {
892                 r, n, url2, err := kc.Get(hash)
893                 c.Check(err, Equals, nil)
894                 c.Check(n, Equals, int64(3))
895                 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
896
897                 content, err2 := ioutil.ReadAll(r)
898                 c.Check(err2, Equals, nil)
899                 c.Check(content, DeepEquals, []byte("foo"))
900         }
901
902         {
903                 n, url2, err := kc.Ask(hash)
904                 c.Check(err, Equals, nil)
905                 c.Check(n, Equals, int64(3))
906                 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
907         }
908 }