Merge remote-tracking branch 'origin/master' into 1885-keep-proxy refs #1885
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "arvados.org/streamer"
5         "crypto/md5"
6         "flag"
7         "fmt"
8         . "gopkg.in/check.v1"
9         "io"
10         "io/ioutil"
11         "log"
12         "net"
13         "net/http"
14         "os"
15         "os/exec"
16         "sort"
17         "strings"
18         "testing"
19 )
20
21 // Gocheck boilerplate
22 func Test(t *testing.T) {
23         TestingT(t)
24 }
25
26 // Gocheck boilerplate
27 var _ = Suite(&ServerRequiredSuite{})
28 var _ = Suite(&StandaloneSuite{})
29
30 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
31
32 // Tests that require the Keep server running
33 type ServerRequiredSuite struct{}
34
35 // Standalone tests
36 type StandaloneSuite struct{}
37
38 func pythonDir() string {
39         gopath := os.Getenv("GOPATH")
40         return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
41 }
42
43 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
44         if *no_server {
45                 c.Skip("Skipping tests that require server")
46         } else {
47                 os.Chdir(pythonDir())
48                 exec.Command("python", "run_test_server.py", "start").Run()
49                 exec.Command("python", "run_test_server.py", "start_keep").Run()
50         }
51 }
52
53 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
54         os.Chdir(pythonDir())
55         exec.Command("python", "run_test_server.py", "stop_keep").Run()
56         exec.Command("python", "run_test_server.py", "stop").Run()
57 }
58
59 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
60         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
61         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
62         os.Setenv("ARVADOS_API_HOST_INSECURE", "")
63
64         kc, err := MakeKeepClient()
65         c.Check(kc.ApiServer, Equals, "localhost:3001")
66         c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
67         c.Check(kc.ApiInsecure, Equals, false)
68
69         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
70
71         kc, err = MakeKeepClient()
72         c.Check(kc.ApiServer, Equals, "localhost:3001")
73         c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
74         c.Check(kc.ApiInsecure, Equals, true)
75         c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
76
77         c.Assert(err, Equals, nil)
78         c.Check(len(kc.Service_roots), Equals, 2)
79         c.Check(kc.Service_roots[0], Equals, "http://localhost:25107")
80         c.Check(kc.Service_roots[1], Equals, "http://localhost:25108")
81 }
82
83 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
84         kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
85
86         // "foo" acbd18db4cc2f85cedef654fccc4a4d8
87         foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
88         c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
89
90         // "bar" 37b51d194a7513e45b56f6524f2d51f2
91         bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
92         c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
93 }
94
95 type StubPutHandler struct {
96         c              *C
97         expectPath     string
98         expectApiToken string
99         expectBody     string
100         handled        chan string
101 }
102
103 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
104         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
105         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
106         body, err := ioutil.ReadAll(req.Body)
107         this.c.Check(err, Equals, nil)
108         this.c.Check(body, DeepEquals, []byte(this.expectBody))
109         resp.WriteHeader(200)
110         this.handled <- fmt.Sprintf("http://%s", req.Host)
111 }
112
113 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
114         var err error
115         listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
116         if err != nil {
117                 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
118         }
119
120         url = fmt.Sprintf("http://localhost:%d", port)
121
122         go http.Serve(listener, st)
123         return listener, url
124 }
125
126 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
127         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
128
129         listener, url := RunBogusKeepServer(st, 2990)
130         defer listener.Close()
131
132         kc, _ := MakeKeepClient()
133         kc.ApiToken = "abc123"
134
135         reader, writer := io.Pipe()
136         upload_status := make(chan uploadStatus)
137
138         f(kc, url, reader, writer, upload_status)
139 }
140
141 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
142         log.Printf("TestUploadToStubKeepServer")
143
144         st := StubPutHandler{
145                 c,
146                 "acbd18db4cc2f85cedef654fccc4a4d8",
147                 "abc123",
148                 "foo",
149                 make(chan string)}
150
151         UploadToStubHelper(c, st,
152                 func(kc KeepClient, url string, reader io.ReadCloser,
153                         writer io.WriteCloser, upload_status chan uploadStatus) {
154
155                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
156
157                         writer.Write([]byte("foo"))
158                         writer.Close()
159
160                         <-st.handled
161                         status := <-upload_status
162                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
163                 })
164
165         log.Printf("TestUploadToStubKeepServer done")
166 }
167
168 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
169         log.Printf("TestUploadToStubKeepServerBufferReader")
170
171         st := StubPutHandler{
172                 c,
173                 "acbd18db4cc2f85cedef654fccc4a4d8",
174                 "abc123",
175                 "foo",
176                 make(chan string)}
177
178         UploadToStubHelper(c, st,
179                 func(kc KeepClient, url string, reader io.ReadCloser,
180                         writer io.WriteCloser, upload_status chan uploadStatus) {
181
182                         tr := streamer.AsyncStreamFromReader(512, reader)
183                         defer tr.Close()
184
185                         br1 := tr.MakeStreamReader()
186
187                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
188
189                         writer.Write([]byte("foo"))
190                         writer.Close()
191
192                         <-st.handled
193
194                         status := <-upload_status
195                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
196                 })
197
198         log.Printf("TestUploadToStubKeepServerBufferReader done")
199 }
200
201 type FailHandler struct {
202         handled chan string
203 }
204
205 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
206         resp.WriteHeader(500)
207         this.handled <- fmt.Sprintf("http://%s", req.Host)
208 }
209
210 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
211         log.Printf("TestFailedUploadToStubKeepServer")
212
213         st := FailHandler{
214                 make(chan string)}
215
216         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
217
218         UploadToStubHelper(c, st,
219                 func(kc KeepClient, url string, reader io.ReadCloser,
220                         writer io.WriteCloser, upload_status chan uploadStatus) {
221
222                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
223
224                         writer.Write([]byte("foo"))
225                         writer.Close()
226
227                         <-st.handled
228
229                         status := <-upload_status
230                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
231                         c.Check(status.statusCode, Equals, 500)
232                 })
233         log.Printf("TestFailedUploadToStubKeepServer done")
234 }
235
236 type KeepServer struct {
237         listener net.Listener
238         url      string
239 }
240
241 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
242         ks = make([]KeepServer, n)
243
244         for i := 0; i < n; i += 1 {
245                 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
246                 ks[i] = KeepServer{boguslistener, bogusurl}
247         }
248
249         return ks
250 }
251
252 func (s *StandaloneSuite) TestPutB(c *C) {
253         log.Printf("TestPutB")
254
255         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
256
257         st := StubPutHandler{
258                 c,
259                 hash,
260                 "abc123",
261                 "foo",
262                 make(chan string, 2)}
263
264         kc, _ := MakeKeepClient()
265
266         kc.Want_replicas = 2
267         kc.ApiToken = "abc123"
268         kc.Service_roots = make([]string, 5)
269
270         ks := RunSomeFakeKeepServers(st, 5, 2990)
271
272         for i := 0; i < len(ks); i += 1 {
273                 kc.Service_roots[i] = ks[i].url
274                 defer ks[i].listener.Close()
275         }
276
277         sort.Strings(kc.Service_roots)
278
279         kc.PutB([]byte("foo"))
280
281         shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
282
283         s1 := <-st.handled
284         s2 := <-st.handled
285         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
286                 (s1 == shuff[1] && s2 == shuff[0]),
287                 Equals,
288                 true)
289
290         log.Printf("TestPutB done")
291 }
292
293 func (s *StandaloneSuite) TestPutHR(c *C) {
294         log.Printf("TestPutHR")
295
296         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
297
298         st := StubPutHandler{
299                 c,
300                 hash,
301                 "abc123",
302                 "foo",
303                 make(chan string, 2)}
304
305         kc, _ := MakeKeepClient()
306
307         kc.Want_replicas = 2
308         kc.ApiToken = "abc123"
309         kc.Service_roots = make([]string, 5)
310
311         ks := RunSomeFakeKeepServers(st, 5, 2990)
312
313         for i := 0; i < len(ks); i += 1 {
314                 kc.Service_roots[i] = ks[i].url
315                 defer ks[i].listener.Close()
316         }
317
318         sort.Strings(kc.Service_roots)
319
320         reader, writer := io.Pipe()
321
322         go func() {
323                 writer.Write([]byte("foo"))
324                 writer.Close()
325         }()
326
327         kc.PutHR(hash, reader, 3)
328
329         shuff := kc.shuffledServiceRoots(hash)
330         log.Print(shuff)
331
332         s1 := <-st.handled
333         s2 := <-st.handled
334
335         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
336                 (s1 == shuff[1] && s2 == shuff[0]),
337                 Equals,
338                 true)
339
340         log.Printf("TestPutHR done")
341 }
342
343 func (s *StandaloneSuite) TestPutWithFail(c *C) {
344         log.Printf("TestPutWithFail")
345
346         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
347
348         st := StubPutHandler{
349                 c,
350                 hash,
351                 "abc123",
352                 "foo",
353                 make(chan string, 2)}
354
355         fh := FailHandler{
356                 make(chan string, 1)}
357
358         kc, _ := MakeKeepClient()
359
360         kc.Want_replicas = 2
361         kc.ApiToken = "abc123"
362         kc.Service_roots = make([]string, 5)
363
364         ks1 := RunSomeFakeKeepServers(st, 4, 2990)
365         ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
366
367         for i, k := range ks1 {
368                 kc.Service_roots[i] = k.url
369                 defer k.listener.Close()
370         }
371         for i, k := range ks2 {
372                 kc.Service_roots[len(ks1)+i] = k.url
373                 defer k.listener.Close()
374         }
375
376         sort.Strings(kc.Service_roots)
377
378         shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
379
380         phash, replicas, err := kc.PutB([]byte("foo"))
381
382         <-fh.handled
383
384         c.Check(err, Equals, nil)
385         c.Check(phash, Equals, hash)
386         c.Check(replicas, Equals, 2)
387         c.Check(<-st.handled, Equals, shuff[1])
388         c.Check(<-st.handled, Equals, shuff[2])
389 }
390
391 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
392         log.Printf("TestPutWithTooManyFail")
393
394         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
395
396         st := StubPutHandler{
397                 c,
398                 hash,
399                 "abc123",
400                 "foo",
401                 make(chan string, 1)}
402
403         fh := FailHandler{
404                 make(chan string, 4)}
405
406         kc, _ := MakeKeepClient()
407
408         kc.Want_replicas = 2
409         kc.ApiToken = "abc123"
410         kc.Service_roots = make([]string, 5)
411
412         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
413         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
414
415         for i, k := range ks1 {
416                 kc.Service_roots[i] = k.url
417                 defer k.listener.Close()
418         }
419         for i, k := range ks2 {
420                 kc.Service_roots[len(ks1)+i] = k.url
421                 defer k.listener.Close()
422         }
423
424         sort.Strings(kc.Service_roots)
425
426         shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
427
428         _, replicas, err := kc.PutB([]byte("foo"))
429
430         c.Check(err, Equals, InsufficientReplicasError)
431         c.Check(replicas, Equals, 1)
432         c.Check(<-st.handled, Equals, shuff[1])
433
434         log.Printf("TestPutWithTooManyFail done")
435 }
436
437 type StubGetHandler struct {
438         c              *C
439         expectPath     string
440         expectApiToken string
441         returnBody     []byte
442 }
443
444 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
445         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
446         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
447         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
448         resp.Write(this.returnBody)
449 }
450
451 func (s *StandaloneSuite) TestGet(c *C) {
452         log.Printf("TestGet")
453
454         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
455
456         st := StubGetHandler{
457                 c,
458                 hash,
459                 "abc123",
460                 []byte("foo")}
461
462         listener, url := RunBogusKeepServer(st, 2990)
463         defer listener.Close()
464
465         kc, _ := MakeKeepClient()
466         kc.ApiToken = "abc123"
467         kc.Service_roots = []string{url}
468
469         r, n, url2, err := kc.Get(hash)
470         defer r.Close()
471         c.Check(err, Equals, nil)
472         c.Check(n, Equals, int64(3))
473         c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
474
475         content, err2 := ioutil.ReadAll(r)
476         c.Check(err2, Equals, nil)
477         c.Check(content, DeepEquals, []byte("foo"))
478
479         log.Printf("TestGet done")
480 }
481
482 func (s *StandaloneSuite) TestGetFail(c *C) {
483         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
484
485         st := FailHandler{make(chan string, 1)}
486
487         listener, url := RunBogusKeepServer(st, 2990)
488         defer listener.Close()
489
490         kc, _ := MakeKeepClient()
491         kc.ApiToken = "abc123"
492         kc.Service_roots = []string{url}
493
494         r, n, url2, err := kc.Get(hash)
495         c.Check(err, Equals, BlockNotFound)
496         c.Check(n, Equals, int64(0))
497         c.Check(url2, Equals, "")
498         c.Check(r, Equals, nil)
499 }
500
501 type BarHandler struct {
502         handled chan string
503 }
504
505 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
506         resp.Write([]byte("bar"))
507         this.handled <- fmt.Sprintf("http://%s", req.Host)
508 }
509
510 func (s *StandaloneSuite) TestChecksum(c *C) {
511         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
512         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
513
514         st := BarHandler{make(chan string, 1)}
515
516         listener, url := RunBogusKeepServer(st, 2990)
517         defer listener.Close()
518
519         kc, _ := MakeKeepClient()
520         kc.ApiToken = "abc123"
521         kc.Service_roots = []string{url}
522
523         r, n, _, err := kc.Get(barhash)
524         _, err = ioutil.ReadAll(r)
525         c.Check(n, Equals, int64(3))
526         c.Check(err, Equals, nil)
527
528         <-st.handled
529
530         r, n, _, err = kc.Get(foohash)
531         _, err = ioutil.ReadAll(r)
532         c.Check(n, Equals, int64(3))
533         c.Check(err, Equals, BadChecksum)
534
535         <-st.handled
536 }
537
538 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
539
540         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
541
542         fh := FailHandler{
543                 make(chan string, 1)}
544
545         st := StubGetHandler{
546                 c,
547                 hash,
548                 "abc123",
549                 []byte("foo")}
550
551         kc, _ := MakeKeepClient()
552         kc.ApiToken = "abc123"
553         kc.Service_roots = make([]string, 5)
554
555         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
556         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
557
558         for i, k := range ks1 {
559                 kc.Service_roots[i] = k.url
560                 defer k.listener.Close()
561         }
562         for i, k := range ks2 {
563                 kc.Service_roots[len(ks1)+i] = k.url
564                 defer k.listener.Close()
565         }
566
567         sort.Strings(kc.Service_roots)
568
569         r, n, url2, err := kc.Get(hash)
570         <-fh.handled
571         c.Check(err, Equals, nil)
572         c.Check(n, Equals, int64(3))
573         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
574
575         content, err2 := ioutil.ReadAll(r)
576         c.Check(err2, Equals, nil)
577         c.Check(content, DeepEquals, []byte("foo"))
578 }
579
580 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
581         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
582         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
583         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
584
585         kc, err := MakeKeepClient()
586         c.Assert(err, Equals, nil)
587
588         hash, replicas, err := kc.PutB([]byte("foo"))
589         c.Check(hash, Equals, fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
590         c.Check(replicas, Equals, 2)
591         c.Check(err, Equals, nil)
592
593         {
594                 r, n, url2, err := kc.Get(hash)
595                 c.Check(err, Equals, nil)
596                 c.Check(n, Equals, int64(3))
597                 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
598
599                 content, err2 := ioutil.ReadAll(r)
600                 c.Check(err2, Equals, nil)
601                 c.Check(content, DeepEquals, []byte("foo"))
602         }
603
604         {
605                 n, url2, err := kc.Ask(hash)
606                 c.Check(err, Equals, nil)
607                 c.Check(n, Equals, int64(3))
608                 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
609         }
610 }
611
612 type StubProxyHandler struct {
613         handled chan string
614 }
615
616 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
617         resp.Header().Set("X-Keep-Replicas-Stored", "2")
618         this.handled <- fmt.Sprintf("http://%s", req.Host)
619 }
620
621 func (s *StandaloneSuite) TestPutProxy(c *C) {
622         log.Printf("TestPutProxy")
623
624         st := StubProxyHandler{make(chan string, 1)}
625
626         kc, _ := MakeKeepClient()
627
628         kc.Want_replicas = 2
629         kc.Using_proxy = true
630         kc.ApiToken = "abc123"
631         kc.Service_roots = make([]string, 1)
632
633         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
634
635         for i, k := range ks1 {
636                 kc.Service_roots[i] = k.url
637                 defer k.listener.Close()
638         }
639
640         _, replicas, err := kc.PutB([]byte("foo"))
641         <-st.handled
642
643         c.Check(err, Equals, nil)
644         c.Check(replicas, Equals, 2)
645
646         log.Printf("TestPutProxy done")
647 }
648
649 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
650         log.Printf("TestPutProxy")
651
652         st := StubProxyHandler{make(chan string, 1)}
653
654         kc, _ := MakeKeepClient()
655
656         kc.Want_replicas = 3
657         kc.Using_proxy = true
658         kc.ApiToken = "abc123"
659         kc.Service_roots = make([]string, 1)
660
661         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
662
663         for i, k := range ks1 {
664                 kc.Service_roots[i] = k.url
665                 defer k.listener.Close()
666         }
667
668         _, replicas, err := kc.PutB([]byte("foo"))
669         <-st.handled
670
671         c.Check(err, Equals, InsufficientReplicasError)
672         c.Check(replicas, Equals, 2)
673
674         log.Printf("TestPutProxy done")
675 }