00a2063cb6b46094dc42a3456382c2c7c17ae35c
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "crypto/md5"
5         "flag"
6         "fmt"
7         . "gopkg.in/check.v1"
8         "io"
9         "io/ioutil"
10         "log"
11         "net"
12         "net/http"
13         "os"
14         "os/exec"
15         "sort"
16         "testing"
17         "time"
18 )
19
20 // Gocheck boilerplate
21 func Test(t *testing.T) { TestingT(t) }
22
23 // Gocheck boilerplate
24 var _ = Suite(&ServerRequiredSuite{})
25 var _ = Suite(&StandaloneSuite{})
26
27 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
28
29 // Tests that require the Keep server running
30 type ServerRequiredSuite struct{}
31
32 // Standalone tests
33 type StandaloneSuite struct{}
34
35 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
36         if *no_server {
37                 c.Skip("Skipping tests that require server")
38         } else {
39                 os.Chdir(os.ExpandEnv("$GOPATH../python"))
40                 exec.Command("python", "run_test_server.py", "start").Run()
41                 exec.Command("python", "run_test_server.py", "start_keep").Run()
42         }
43 }
44
45 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
46         os.Chdir(os.ExpandEnv("$GOPATH../python"))
47         exec.Command("python", "run_test_server.py", "stop_keep").Run()
48         exec.Command("python", "run_test_server.py", "stop").Run()
49 }
50
51 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
52         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
53         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
54         os.Setenv("ARVADOS_API_HOST_INSECURE", "")
55
56         kc, err := MakeKeepClient()
57         c.Assert(kc.ApiServer, Equals, "localhost:3001")
58         c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
59         c.Assert(kc.ApiInsecure, Equals, false)
60
61         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
62
63         kc, err = MakeKeepClient()
64         c.Assert(kc.ApiServer, Equals, "localhost:3001")
65         c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
66         c.Assert(kc.ApiInsecure, Equals, true)
67
68         c.Assert(err, Equals, nil)
69         c.Assert(len(kc.Service_roots), Equals, 2)
70         c.Assert(kc.Service_roots[0], Equals, "http://localhost:25107")
71         c.Assert(kc.Service_roots[1], Equals, "http://localhost:25108")
72 }
73
74 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
75         kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
76
77         // "foo" acbd18db4cc2f85cedef654fccc4a4d8
78         foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
79         c.Check(kc.ShuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
80
81         // "bar" 37b51d194a7513e45b56f6524f2d51f2
82         bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
83         c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
84 }
85
86 func ReadIntoBufferHelper(c *C, bufsize int) {
87         buffer := make([]byte, bufsize)
88
89         reader, writer := io.Pipe()
90         slices := make(chan ReaderSlice)
91
92         go ReadIntoBuffer(buffer, reader, slices)
93
94         {
95                 out := make([]byte, 128)
96                 for i := 0; i < 128; i += 1 {
97                         out[i] = byte(i)
98                 }
99                 writer.Write(out)
100                 s1 := <-slices
101                 c.Check(len(s1.slice), Equals, 128)
102                 c.Check(s1.reader_error, Equals, nil)
103                 for i := 0; i < 128; i += 1 {
104                         c.Check(s1.slice[i], Equals, byte(i))
105                 }
106                 for i := 0; i < len(buffer); i += 1 {
107                         if i < 128 {
108                                 c.Check(buffer[i], Equals, byte(i))
109                         } else {
110                                 c.Check(buffer[i], Equals, byte(0))
111                         }
112                 }
113         }
114         {
115                 out := make([]byte, 96)
116                 for i := 0; i < 96; i += 1 {
117                         out[i] = byte(i / 2)
118                 }
119                 writer.Write(out)
120                 s1 := <-slices
121                 c.Check(len(s1.slice), Equals, 96)
122                 c.Check(s1.reader_error, Equals, nil)
123                 for i := 0; i < 96; i += 1 {
124                         c.Check(s1.slice[i], Equals, byte(i/2))
125                 }
126                 for i := 0; i < len(buffer); i += 1 {
127                         if i < 128 {
128                                 c.Check(buffer[i], Equals, byte(i))
129                         } else if i < (128 + 96) {
130                                 c.Check(buffer[i], Equals, byte((i-128)/2))
131                         } else {
132                                 c.Check(buffer[i], Equals, byte(0))
133                         }
134                 }
135         }
136         {
137                 writer.Close()
138                 s1 := <-slices
139                 c.Check(len(s1.slice), Equals, 0)
140                 c.Check(s1.reader_error, Equals, io.EOF)
141         }
142 }
143
144 func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
145         ReadIntoBufferHelper(c, 512)
146         ReadIntoBufferHelper(c, 225)
147         ReadIntoBufferHelper(c, 224)
148 }
149
150 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
151         buffer := make([]byte, 223)
152         reader, writer := io.Pipe()
153         slices := make(chan ReaderSlice)
154
155         go ReadIntoBuffer(buffer, reader, slices)
156
157         {
158                 out := make([]byte, 128)
159                 for i := 0; i < 128; i += 1 {
160                         out[i] = byte(i)
161                 }
162                 writer.Write(out)
163                 s1 := <-slices
164                 c.Check(len(s1.slice), Equals, 128)
165                 c.Check(s1.reader_error, Equals, nil)
166                 for i := 0; i < 128; i += 1 {
167                         c.Check(s1.slice[i], Equals, byte(i))
168                 }
169                 for i := 0; i < len(buffer); i += 1 {
170                         if i < 128 {
171                                 c.Check(buffer[i], Equals, byte(i))
172                         } else {
173                                 c.Check(buffer[i], Equals, byte(0))
174                         }
175                 }
176         }
177         {
178                 out := make([]byte, 96)
179                 for i := 0; i < 96; i += 1 {
180                         out[i] = byte(i / 2)
181                 }
182
183                 // Write will deadlock because it can't write all the data, so
184                 // spin it off to a goroutine
185                 go writer.Write(out)
186                 s1 := <-slices
187
188                 c.Check(len(s1.slice), Equals, 95)
189                 c.Check(s1.reader_error, Equals, nil)
190                 for i := 0; i < 95; i += 1 {
191                         c.Check(s1.slice[i], Equals, byte(i/2))
192                 }
193                 for i := 0; i < len(buffer); i += 1 {
194                         if i < 128 {
195                                 c.Check(buffer[i], Equals, byte(i))
196                         } else if i < (128 + 95) {
197                                 c.Check(buffer[i], Equals, byte((i-128)/2))
198                         } else {
199                                 c.Check(buffer[i], Equals, byte(0))
200                         }
201                 }
202         }
203         {
204                 writer.Close()
205                 s1 := <-slices
206                 c.Check(len(s1.slice), Equals, 0)
207                 c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
208         }
209
210 }
211
212 func (s *StandaloneSuite) TestTransfer(c *C) {
213         reader, writer := io.Pipe()
214
215         // Buffer for reads from 'r'
216         buffer := make([]byte, 512)
217
218         // Read requests on Transfer() buffer
219         requests := make(chan ReadRequest)
220         defer close(requests)
221
222         // Reporting reader error states
223         reader_status := make(chan error)
224
225         go Transfer(buffer, reader, requests, reader_status)
226
227         br1 := MakeBufferReader(requests)
228         out := make([]byte, 128)
229
230         {
231                 // Write some data, and read into a buffer shorter than
232                 // available data
233                 for i := 0; i < 128; i += 1 {
234                         out[i] = byte(i)
235                 }
236
237                 writer.Write(out[:100])
238
239                 in := make([]byte, 64)
240                 n, err := br1.Read(in)
241
242                 c.Check(n, Equals, 64)
243                 c.Check(err, Equals, nil)
244
245                 for i := 0; i < 64; i += 1 {
246                         c.Check(in[i], Equals, out[i])
247                 }
248         }
249
250         {
251                 // Write some more data, and read into buffer longer than
252                 // available data
253                 in := make([]byte, 64)
254                 n, err := br1.Read(in)
255                 c.Check(n, Equals, 36)
256                 c.Check(err, Equals, nil)
257
258                 for i := 0; i < 36; i += 1 {
259                         c.Check(in[i], Equals, out[64+i])
260                 }
261
262         }
263
264         {
265                 // Test read before write
266                 type Rd struct {
267                         n   int
268                         err error
269                 }
270                 rd := make(chan Rd)
271                 in := make([]byte, 64)
272
273                 go func() {
274                         n, err := br1.Read(in)
275                         rd <- Rd{n, err}
276                 }()
277
278                 time.Sleep(100 * time.Millisecond)
279                 writer.Write(out[100:])
280
281                 got := <-rd
282
283                 c.Check(got.n, Equals, 28)
284                 c.Check(got.err, Equals, nil)
285
286                 for i := 0; i < 28; i += 1 {
287                         c.Check(in[i], Equals, out[100+i])
288                 }
289         }
290
291         br2 := MakeBufferReader(requests)
292         {
293                 // Test 'catch up' reader
294                 in := make([]byte, 256)
295                 n, err := br2.Read(in)
296
297                 c.Check(n, Equals, 128)
298                 c.Check(err, Equals, nil)
299
300                 for i := 0; i < 128; i += 1 {
301                         c.Check(in[i], Equals, out[i])
302                 }
303         }
304
305         {
306                 // Test closing the reader
307                 writer.Close()
308                 status := <-reader_status
309                 c.Check(status, Equals, io.EOF)
310
311                 in := make([]byte, 256)
312                 n1, err1 := br1.Read(in)
313                 n2, err2 := br2.Read(in)
314                 c.Check(n1, Equals, 0)
315                 c.Check(err1, Equals, io.EOF)
316                 c.Check(n2, Equals, 0)
317                 c.Check(err2, Equals, io.EOF)
318         }
319
320         {
321                 // Test 'catch up' reader after closing
322                 br3 := MakeBufferReader(requests)
323                 in := make([]byte, 256)
324                 n, err := br3.Read(in)
325
326                 c.Check(n, Equals, 128)
327                 c.Check(err, Equals, nil)
328
329                 for i := 0; i < 128; i += 1 {
330                         c.Check(in[i], Equals, out[i])
331                 }
332
333                 n, err = br3.Read(in)
334
335                 c.Check(n, Equals, 0)
336                 c.Check(err, Equals, io.EOF)
337         }
338 }
339
340 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
341         reader, writer := io.Pipe()
342
343         // Buffer for reads from 'r'
344         buffer := make([]byte, 100)
345
346         // Read requests on Transfer() buffer
347         requests := make(chan ReadRequest)
348         defer close(requests)
349
350         // Reporting reader error states
351         reader_status := make(chan error)
352
353         go Transfer(buffer, reader, requests, reader_status)
354
355         out := make([]byte, 101)
356         go writer.Write(out)
357
358         status := <-reader_status
359         c.Check(status, Equals, io.ErrShortBuffer)
360 }
361
362 func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
363         // Buffer for reads from 'r'
364         buffer := make([]byte, 100)
365         for i := 0; i < 100; i += 1 {
366                 buffer[i] = byte(i)
367         }
368
369         // Read requests on Transfer() buffer
370         requests := make(chan ReadRequest)
371         defer close(requests)
372
373         go Transfer(buffer, nil, requests, nil)
374
375         br1 := MakeBufferReader(requests)
376
377         in := make([]byte, 64)
378         {
379                 n, err := br1.Read(in)
380
381                 c.Check(n, Equals, 64)
382                 c.Check(err, Equals, nil)
383
384                 for i := 0; i < 64; i += 1 {
385                         c.Check(in[i], Equals, buffer[i])
386                 }
387         }
388         {
389                 n, err := br1.Read(in)
390
391                 c.Check(n, Equals, 36)
392                 c.Check(err, Equals, nil)
393
394                 for i := 0; i < 36; i += 1 {
395                         c.Check(in[i], Equals, buffer[64+i])
396                 }
397         }
398         {
399                 n, err := br1.Read(in)
400
401                 c.Check(n, Equals, 0)
402                 c.Check(err, Equals, io.EOF)
403         }
404 }
405
406 func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
407         // Buffer for reads from 'r'
408         buffer := make([]byte, 100)
409         for i := 0; i < 100; i += 1 {
410                 buffer[i] = byte(i)
411         }
412
413         // Read requests on Transfer() buffer
414         requests := make(chan ReadRequest)
415         defer close(requests)
416
417         go Transfer(buffer, nil, requests, nil)
418
419         br1 := MakeBufferReader(requests)
420
421         reader, writer := io.Pipe()
422
423         go func() {
424                 p := make([]byte, 100)
425                 n, err := reader.Read(p)
426                 c.Check(n, Equals, 100)
427                 c.Check(err, Equals, nil)
428                 c.Check(p, DeepEquals, buffer)
429         }()
430
431         io.Copy(writer, br1)
432 }
433
434 type StubPutHandler struct {
435         c              *C
436         expectPath     string
437         expectApiToken string
438         expectBody     string
439         handled        chan string
440 }
441
442 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
443         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
444         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
445         body, err := ioutil.ReadAll(req.Body)
446         this.c.Check(err, Equals, nil)
447         this.c.Check(body, DeepEquals, []byte(this.expectBody))
448         resp.WriteHeader(200)
449         this.handled <- fmt.Sprintf("http://%s", req.Host)
450 }
451
452 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
453         server := http.Server{Handler: st}
454
455         var err error
456         listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
457         if err != nil {
458                 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
459         }
460
461         url = fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
462
463         go server.Serve(listener)
464         return listener, url
465 }
466
467 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
468         io.ReadCloser, io.WriteCloser, chan UploadStatus)) {
469
470         listener, url := RunBogusKeepServer(st, 2990)
471         defer listener.Close()
472
473         kc, _ := MakeKeepClient()
474         kc.ApiToken = "abc123"
475
476         reader, writer := io.Pipe()
477         upload_status := make(chan UploadStatus)
478
479         f(kc, url, reader, writer, upload_status)
480 }
481
482 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
483         st := StubPutHandler{
484                 c,
485                 "acbd18db4cc2f85cedef654fccc4a4d8",
486                 "abc123",
487                 "foo",
488                 make(chan string)}
489
490         UploadToStubHelper(c, st,
491                 func(kc *KeepClient, url string, reader io.ReadCloser,
492                         writer io.WriteCloser, upload_status chan UploadStatus) {
493
494                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
495
496                         writer.Write([]byte("foo"))
497                         writer.Close()
498
499                         <-st.handled
500                         status := <-upload_status
501                         c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
502                 })
503 }
504
505 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
506         st := StubPutHandler{
507                 c,
508                 "acbd18db4cc2f85cedef654fccc4a4d8",
509                 "abc123",
510                 "foo",
511                 make(chan string)}
512
513         UploadToStubHelper(c, st,
514                 func(kc *KeepClient, url string, reader io.ReadCloser,
515                         writer io.WriteCloser, upload_status chan UploadStatus) {
516
517                         // Buffer for reads from 'r'
518                         buffer := make([]byte, 512)
519
520                         // Read requests on Transfer() buffer
521                         requests := make(chan ReadRequest)
522                         defer close(requests)
523
524                         // Reporting reader error states
525                         reader_status := make(chan error)
526
527                         go Transfer(buffer, reader, requests, reader_status)
528
529                         br1 := MakeBufferReader(requests)
530
531                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
532
533                         writer.Write([]byte("foo"))
534                         writer.Close()
535
536                         <-reader_status
537                         <-st.handled
538
539                         status := <-upload_status
540                         c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
541
542                         //c.Check(true, Equals, false)
543                 })
544 }
545
546 type FailHandler struct {
547         handled chan string
548 }
549
550 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
551         resp.WriteHeader(400)
552         this.handled <- fmt.Sprintf("http://%s", req.Host)
553 }
554
555 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
556         st := FailHandler{
557                 make(chan string)}
558
559         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
560
561         UploadToStubHelper(c, st,
562                 func(kc *KeepClient, url string, reader io.ReadCloser,
563                         writer io.WriteCloser, upload_status chan UploadStatus) {
564
565                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
566
567                         writer.Write([]byte("foo"))
568                         writer.Close()
569
570                         <-st.handled
571
572                         status := <-upload_status
573                         c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
574                         c.Check(status.StatusCode, Equals, 400)
575                 })
576
577 }
578
579 type KeepServer struct {
580         listener net.Listener
581         url      string
582 }
583
584 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
585         ks = make([]KeepServer, n)
586
587         for i := 0; i < n; i += 1 {
588                 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
589                 ks[i] = KeepServer{boguslistener, bogusurl}
590         }
591
592         return ks
593 }
594
595 func (s *StandaloneSuite) TestPutB(c *C) {
596         log.Printf("TestPutB")
597
598         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
599
600         st := StubPutHandler{
601                 c,
602                 hash,
603                 "abc123",
604                 "foo",
605                 make(chan string, 2)}
606
607         kc, _ := MakeKeepClient()
608
609         kc.Want_replicas = 2
610         kc.ApiToken = "abc123"
611         kc.Service_roots = make([]string, 5)
612
613         ks := RunSomeFakeKeepServers(st, 5, 2990)
614
615         for i := 0; i < len(ks); i += 1 {
616                 kc.Service_roots[i] = ks[i].url
617                 defer ks[i].listener.Close()
618         }
619
620         sort.Strings(kc.Service_roots)
621
622         kc.PutB([]byte("foo"))
623
624         shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
625
626         c.Check(<-st.handled, Equals, shuff[0])
627         c.Check(<-st.handled, Equals, shuff[1])
628 }
629
630 func (s *StandaloneSuite) TestPutHR(c *C) {
631         log.Printf("TestPutHR")
632
633         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
634
635         st := StubPutHandler{
636                 c,
637                 hash,
638                 "abc123",
639                 "foo",
640                 make(chan string, 2)}
641
642         kc, _ := MakeKeepClient()
643
644         kc.Want_replicas = 2
645         kc.ApiToken = "abc123"
646         kc.Service_roots = make([]string, 5)
647
648         ks := RunSomeFakeKeepServers(st, 5, 2990)
649
650         for i := 0; i < len(ks); i += 1 {
651                 kc.Service_roots[i] = ks[i].url
652                 defer ks[i].listener.Close()
653         }
654
655         sort.Strings(kc.Service_roots)
656
657         reader, writer := io.Pipe()
658
659         go func() {
660                 writer.Write([]byte("foo"))
661                 writer.Close()
662         }()
663
664         kc.PutHR(hash, reader, 3)
665
666         shuff := kc.ShuffledServiceRoots(hash)
667         log.Print(shuff)
668
669         s1 := <-st.handled
670         s2 := <-st.handled
671
672         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
673                 (s1 == shuff[1] && s2 == shuff[0]),
674                 Equals,
675                 true)
676 }
677
678 func (s *StandaloneSuite) TestPutWithFail(c *C) {
679         log.Printf("TestPutWithFail")
680
681         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
682
683         st := StubPutHandler{
684                 c,
685                 hash,
686                 "abc123",
687                 "foo",
688                 make(chan string, 2)}
689
690         fh := FailHandler{
691                 make(chan string, 1)}
692
693         kc, _ := MakeKeepClient()
694
695         kc.Want_replicas = 2
696         kc.ApiToken = "abc123"
697         kc.Service_roots = make([]string, 5)
698
699         ks1 := RunSomeFakeKeepServers(st, 4, 2990)
700         ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
701
702         for i, k := range ks1 {
703                 kc.Service_roots[i] = k.url
704                 defer k.listener.Close()
705         }
706         for i, k := range ks2 {
707                 kc.Service_roots[len(ks1)+i] = k.url
708                 defer k.listener.Close()
709         }
710
711         sort.Strings(kc.Service_roots)
712
713         shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
714
715         phash, replicas, err := kc.PutB([]byte("foo"))
716
717         <-fh.handled
718
719         c.Check(err, Equals, nil)
720         c.Check(phash, Equals, hash)
721         c.Check(replicas, Equals, 2)
722         c.Check(<-st.handled, Equals, shuff[1])
723         c.Check(<-st.handled, Equals, shuff[2])
724 }
725
726 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
727         log.Printf("TestPutWithTooManyFail")
728
729         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
730
731         st := StubPutHandler{
732                 c,
733                 hash,
734                 "abc123",
735                 "foo",
736                 make(chan string, 1)}
737
738         fh := FailHandler{
739                 make(chan string, 4)}
740
741         kc, _ := MakeKeepClient()
742
743         kc.Want_replicas = 2
744         kc.ApiToken = "abc123"
745         kc.Service_roots = make([]string, 5)
746
747         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
748         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
749
750         for i, k := range ks1 {
751                 kc.Service_roots[i] = k.url
752                 defer k.listener.Close()
753         }
754         for i, k := range ks2 {
755                 kc.Service_roots[len(ks1)+i] = k.url
756                 defer k.listener.Close()
757         }
758
759         sort.Strings(kc.Service_roots)
760
761         shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
762
763         _, replicas, err := kc.PutB([]byte("foo"))
764
765         c.Check(err, Equals, InsufficientReplicasError)
766         c.Check(replicas, Equals, 1)
767         c.Check(<-st.handled, Equals, shuff[1])
768 }
769
770 type StubGetHandler struct {
771         c              *C
772         expectPath     string
773         expectApiToken string
774         returnBody     []byte
775 }
776
777 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
778         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
779         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
780         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
781         resp.Write(this.returnBody)
782 }
783
784 func (s *StandaloneSuite) TestGet(c *C) {
785
786         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
787
788         st := StubGetHandler{
789                 c,
790                 hash,
791                 "abc123",
792                 []byte("foo")}
793
794         listener, url := RunBogusKeepServer(st, 2990)
795         defer listener.Close()
796
797         kc, _ := MakeKeepClient()
798         kc.ApiToken = "abc123"
799         kc.Service_roots = []string{url}
800
801         r, n, url2, err := kc.Get(hash)
802         c.Check(err, Equals, nil)
803         c.Check(n, Equals, int64(3))
804         c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
805
806         content, err2 := ioutil.ReadAll(r)
807         c.Check(err2, Equals, nil)
808         c.Check(content, DeepEquals, []byte("foo"))
809 }
810
811 func (s *StandaloneSuite) TestGetFail(c *C) {
812         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
813
814         st := FailHandler{make(chan string, 1)}
815
816         listener, url := RunBogusKeepServer(st, 2990)
817         defer listener.Close()
818
819         kc, _ := MakeKeepClient()
820         kc.ApiToken = "abc123"
821         kc.Service_roots = []string{url}
822
823         r, n, url2, err := kc.Get(hash)
824         c.Check(err, Equals, BlockNotFound)
825         c.Check(n, Equals, int64(0))
826         c.Check(url2, Equals, "")
827         c.Check(r, Equals, nil)
828 }
829
830 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
831
832         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
833
834         fh := FailHandler{
835                 make(chan string, 1)}
836
837         st := StubGetHandler{
838                 c,
839                 hash,
840                 "abc123",
841                 []byte("foo")}
842
843         kc, _ := MakeKeepClient()
844         kc.ApiToken = "abc123"
845         kc.Service_roots = make([]string, 5)
846
847         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
848         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
849
850         for i, k := range ks1 {
851                 kc.Service_roots[i] = k.url
852                 defer k.listener.Close()
853         }
854         for i, k := range ks2 {
855                 kc.Service_roots[len(ks1)+i] = k.url
856                 defer k.listener.Close()
857         }
858
859         sort.Strings(kc.Service_roots)
860
861         r, n, url2, err := kc.Get(hash)
862         <-fh.handled
863         c.Check(err, Equals, nil)
864         c.Check(n, Equals, int64(3))
865         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
866
867         content, err2 := ioutil.ReadAll(r)
868         c.Check(err2, Equals, nil)
869         c.Check(content, DeepEquals, []byte("foo"))
870 }
871
872 func (s *ServerRequiredSuite) TestPutAndGet(c *C) {
873         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
874         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
875         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
876
877         kc, err := MakeKeepClient()
878         c.Assert(err, Equals, nil)
879
880         hash, replicas, err := kc.PutB([]byte("foo"))
881         c.Check(hash, Equals, fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
882         c.Check(replicas, Equals, 2)
883         c.Check(err, Equals, nil)
884
885         r, n, url2, err := kc.Get(hash)
886         c.Check(err, Equals, nil)
887         c.Check(n, Equals, int64(3))
888         c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
889
890         content, err2 := ioutil.ReadAll(r)
891         c.Check(err2, Equals, nil)
892         c.Check(content, DeepEquals, []byte("foo"))
893 }