1885: Made ServiceRoots atomically updatable, so that KeepProxy can poll for
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "arvados.org/streamer"
5         "crypto/md5"
6         "flag"
7         "fmt"
8         . "gopkg.in/check.v1"
9         "io"
10         "io/ioutil"
11         "log"
12         "net"
13         "net/http"
14         "os"
15         "os/exec"
16         "strings"
17         "testing"
18 )
19
20 // Gocheck boilerplate
21 func Test(t *testing.T) {
22         TestingT(t)
23 }
24
25 // Gocheck boilerplate
26 var _ = Suite(&ServerRequiredSuite{})
27 var _ = Suite(&StandaloneSuite{})
28
29 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
30
31 // Tests that require the Keep server running
32 type ServerRequiredSuite struct{}
33
34 // Standalone tests
35 type StandaloneSuite struct{}
36
37 func pythonDir() string {
38         gopath := os.Getenv("GOPATH")
39         return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
40 }
41
42 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
43         if *no_server {
44                 c.Skip("Skipping tests that require server")
45         } else {
46                 os.Chdir(pythonDir())
47                 exec.Command("python", "run_test_server.py", "start").Run()
48                 exec.Command("python", "run_test_server.py", "start_keep").Run()
49         }
50 }
51
52 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
53         os.Chdir(pythonDir())
54         exec.Command("python", "run_test_server.py", "stop_keep").Run()
55         exec.Command("python", "run_test_server.py", "stop").Run()
56 }
57
58 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
59         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
60         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
61         os.Setenv("ARVADOS_API_HOST_INSECURE", "")
62
63         kc, err := MakeKeepClient()
64         c.Check(kc.ApiServer, Equals, "localhost:3001")
65         c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
66         c.Check(kc.ApiInsecure, Equals, false)
67
68         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
69
70         kc, err = MakeKeepClient()
71         c.Check(kc.ApiServer, Equals, "localhost:3001")
72         c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
73         c.Check(kc.ApiInsecure, Equals, true)
74         c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
75
76         c.Assert(err, Equals, nil)
77         c.Check(len(kc.ServiceRoots()), Equals, 2)
78         c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
79         c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
80 }
81
82 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
83         kc := KeepClient{}
84         kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
85
86         // "foo" acbd18db4cc2f85cedef654fccc4a4d8
87         foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
88         c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
89
90         // "bar" 37b51d194a7513e45b56f6524f2d51f2
91         bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
92         c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
93 }
94
95 type StubPutHandler struct {
96         c              *C
97         expectPath     string
98         expectApiToken string
99         expectBody     string
100         handled        chan string
101 }
102
103 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
104         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
105         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
106         body, err := ioutil.ReadAll(req.Body)
107         this.c.Check(err, Equals, nil)
108         this.c.Check(body, DeepEquals, []byte(this.expectBody))
109         resp.WriteHeader(200)
110         this.handled <- fmt.Sprintf("http://%s", req.Host)
111 }
112
113 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
114         var err error
115         listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
116         if err != nil {
117                 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
118         }
119
120         url = fmt.Sprintf("http://localhost:%d", port)
121
122         go http.Serve(listener, st)
123         return listener, url
124 }
125
126 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
127         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
128
129         listener, url := RunBogusKeepServer(st, 2990)
130         defer listener.Close()
131
132         kc, _ := MakeKeepClient()
133         kc.ApiToken = "abc123"
134
135         reader, writer := io.Pipe()
136         upload_status := make(chan uploadStatus)
137
138         f(kc, url, reader, writer, upload_status)
139 }
140
141 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
142         log.Printf("TestUploadToStubKeepServer")
143
144         st := StubPutHandler{
145                 c,
146                 "acbd18db4cc2f85cedef654fccc4a4d8",
147                 "abc123",
148                 "foo",
149                 make(chan string)}
150
151         UploadToStubHelper(c, st,
152                 func(kc KeepClient, url string, reader io.ReadCloser,
153                         writer io.WriteCloser, upload_status chan uploadStatus) {
154
155                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
156
157                         writer.Write([]byte("foo"))
158                         writer.Close()
159
160                         <-st.handled
161                         status := <-upload_status
162                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
163                 })
164
165         log.Printf("TestUploadToStubKeepServer done")
166 }
167
168 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
169         log.Printf("TestUploadToStubKeepServerBufferReader")
170
171         st := StubPutHandler{
172                 c,
173                 "acbd18db4cc2f85cedef654fccc4a4d8",
174                 "abc123",
175                 "foo",
176                 make(chan string)}
177
178         UploadToStubHelper(c, st,
179                 func(kc KeepClient, url string, reader io.ReadCloser,
180                         writer io.WriteCloser, upload_status chan uploadStatus) {
181
182                         tr := streamer.AsyncStreamFromReader(512, reader)
183                         defer tr.Close()
184
185                         br1 := tr.MakeStreamReader()
186
187                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
188
189                         writer.Write([]byte("foo"))
190                         writer.Close()
191
192                         <-st.handled
193
194                         status := <-upload_status
195                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
196                 })
197
198         log.Printf("TestUploadToStubKeepServerBufferReader done")
199 }
200
201 type FailHandler struct {
202         handled chan string
203 }
204
205 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
206         resp.WriteHeader(500)
207         this.handled <- fmt.Sprintf("http://%s", req.Host)
208 }
209
210 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
211         log.Printf("TestFailedUploadToStubKeepServer")
212
213         st := FailHandler{
214                 make(chan string)}
215
216         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
217
218         UploadToStubHelper(c, st,
219                 func(kc KeepClient, url string, reader io.ReadCloser,
220                         writer io.WriteCloser, upload_status chan uploadStatus) {
221
222                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
223
224                         writer.Write([]byte("foo"))
225                         writer.Close()
226
227                         <-st.handled
228
229                         status := <-upload_status
230                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
231                         c.Check(status.statusCode, Equals, 500)
232                 })
233         log.Printf("TestFailedUploadToStubKeepServer done")
234 }
235
236 type KeepServer struct {
237         listener net.Listener
238         url      string
239 }
240
241 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
242         ks = make([]KeepServer, n)
243
244         for i := 0; i < n; i += 1 {
245                 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
246                 ks[i] = KeepServer{boguslistener, bogusurl}
247         }
248
249         return ks
250 }
251
252 func (s *StandaloneSuite) TestPutB(c *C) {
253         log.Printf("TestPutB")
254
255         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
256
257         st := StubPutHandler{
258                 c,
259                 hash,
260                 "abc123",
261                 "foo",
262                 make(chan string, 2)}
263
264         kc, _ := MakeKeepClient()
265
266         kc.Want_replicas = 2
267         kc.ApiToken = "abc123"
268         service_roots := make([]string, 5)
269
270         ks := RunSomeFakeKeepServers(st, 5, 2990)
271
272         for i := 0; i < len(ks); i += 1 {
273                 service_roots[i] = ks[i].url
274                 defer ks[i].listener.Close()
275         }
276
277         kc.SetServiceRoots(service_roots)
278
279         kc.PutB([]byte("foo"))
280
281         shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
282
283         s1 := <-st.handled
284         s2 := <-st.handled
285         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
286                 (s1 == shuff[1] && s2 == shuff[0]),
287                 Equals,
288                 true)
289
290         log.Printf("TestPutB done")
291 }
292
293 func (s *StandaloneSuite) TestPutHR(c *C) {
294         log.Printf("TestPutHR")
295
296         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
297
298         st := StubPutHandler{
299                 c,
300                 hash,
301                 "abc123",
302                 "foo",
303                 make(chan string, 2)}
304
305         kc, _ := MakeKeepClient()
306
307         kc.Want_replicas = 2
308         kc.ApiToken = "abc123"
309         service_roots := make([]string, 5)
310
311         ks := RunSomeFakeKeepServers(st, 5, 2990)
312
313         for i := 0; i < len(ks); i += 1 {
314                 service_roots[i] = ks[i].url
315                 defer ks[i].listener.Close()
316         }
317
318         kc.SetServiceRoots(service_roots)
319
320         reader, writer := io.Pipe()
321
322         go func() {
323                 writer.Write([]byte("foo"))
324                 writer.Close()
325         }()
326
327         kc.PutHR(hash, reader, 3)
328
329         shuff := kc.shuffledServiceRoots(hash)
330         log.Print(shuff)
331
332         s1 := <-st.handled
333         s2 := <-st.handled
334
335         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
336                 (s1 == shuff[1] && s2 == shuff[0]),
337                 Equals,
338                 true)
339
340         log.Printf("TestPutHR done")
341 }
342
343 func (s *StandaloneSuite) TestPutWithFail(c *C) {
344         log.Printf("TestPutWithFail")
345
346         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
347
348         st := StubPutHandler{
349                 c,
350                 hash,
351                 "abc123",
352                 "foo",
353                 make(chan string, 2)}
354
355         fh := FailHandler{
356                 make(chan string, 1)}
357
358         kc, _ := MakeKeepClient()
359
360         kc.Want_replicas = 2
361         kc.ApiToken = "abc123"
362         service_roots := make([]string, 5)
363
364         ks1 := RunSomeFakeKeepServers(st, 4, 2990)
365         ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
366
367         for i, k := range ks1 {
368                 service_roots[i] = k.url
369                 defer k.listener.Close()
370         }
371         for i, k := range ks2 {
372                 service_roots[len(ks1)+i] = k.url
373                 defer k.listener.Close()
374         }
375
376         kc.SetServiceRoots(service_roots)
377
378         shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
379
380         phash, replicas, err := kc.PutB([]byte("foo"))
381
382         <-fh.handled
383
384         c.Check(err, Equals, nil)
385         c.Check(phash, Equals, hash)
386         c.Check(replicas, Equals, 2)
387         c.Check(<-st.handled, Equals, shuff[1])
388         c.Check(<-st.handled, Equals, shuff[2])
389 }
390
391 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
392         log.Printf("TestPutWithTooManyFail")
393
394         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
395
396         st := StubPutHandler{
397                 c,
398                 hash,
399                 "abc123",
400                 "foo",
401                 make(chan string, 1)}
402
403         fh := FailHandler{
404                 make(chan string, 4)}
405
406         kc, _ := MakeKeepClient()
407
408         kc.Want_replicas = 2
409         kc.ApiToken = "abc123"
410         service_roots := make([]string, 5)
411
412         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
413         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
414
415         for i, k := range ks1 {
416                 service_roots[i] = k.url
417                 defer k.listener.Close()
418         }
419         for i, k := range ks2 {
420                 service_roots[len(ks1)+i] = k.url
421                 defer k.listener.Close()
422         }
423
424         kc.SetServiceRoots(service_roots)
425
426         shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
427
428         _, replicas, err := kc.PutB([]byte("foo"))
429
430         c.Check(err, Equals, InsufficientReplicasError)
431         c.Check(replicas, Equals, 1)
432         c.Check(<-st.handled, Equals, shuff[1])
433
434         log.Printf("TestPutWithTooManyFail done")
435 }
436
437 type StubGetHandler struct {
438         c              *C
439         expectPath     string
440         expectApiToken string
441         returnBody     []byte
442 }
443
444 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
445         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
446         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
447         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
448         resp.Write(this.returnBody)
449 }
450
451 func (s *StandaloneSuite) TestGet(c *C) {
452         log.Printf("TestGet")
453
454         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
455
456         st := StubGetHandler{
457                 c,
458                 hash,
459                 "abc123",
460                 []byte("foo")}
461
462         listener, url := RunBogusKeepServer(st, 2990)
463         defer listener.Close()
464
465         kc, _ := MakeKeepClient()
466         kc.ApiToken = "abc123"
467         kc.SetServiceRoots([]string{url})
468
469         r, n, url2, err := kc.Get(hash)
470         defer r.Close()
471         c.Check(err, Equals, nil)
472         c.Check(n, Equals, int64(3))
473         c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
474
475         content, err2 := ioutil.ReadAll(r)
476         c.Check(err2, Equals, nil)
477         c.Check(content, DeepEquals, []byte("foo"))
478
479         log.Printf("TestGet done")
480 }
481
482 func (s *StandaloneSuite) TestGetFail(c *C) {
483         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
484
485         st := FailHandler{make(chan string, 1)}
486
487         listener, url := RunBogusKeepServer(st, 2990)
488         defer listener.Close()
489
490         kc, _ := MakeKeepClient()
491         kc.ApiToken = "abc123"
492         kc.SetServiceRoots([]string{url})
493
494         r, n, url2, err := kc.Get(hash)
495         c.Check(err, Equals, BlockNotFound)
496         c.Check(n, Equals, int64(0))
497         c.Check(url2, Equals, "")
498         c.Check(r, Equals, nil)
499 }
500
501 type BarHandler struct {
502         handled chan string
503 }
504
505 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
506         resp.Write([]byte("bar"))
507         this.handled <- fmt.Sprintf("http://%s", req.Host)
508 }
509
510 func (s *StandaloneSuite) TestChecksum(c *C) {
511         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
512         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
513
514         st := BarHandler{make(chan string, 1)}
515
516         listener, url := RunBogusKeepServer(st, 2990)
517         defer listener.Close()
518
519         kc, _ := MakeKeepClient()
520         kc.ApiToken = "abc123"
521         kc.SetServiceRoots([]string{url})
522
523         r, n, _, err := kc.Get(barhash)
524         _, err = ioutil.ReadAll(r)
525         c.Check(n, Equals, int64(3))
526         c.Check(err, Equals, nil)
527
528         <-st.handled
529
530         r, n, _, err = kc.Get(foohash)
531         _, err = ioutil.ReadAll(r)
532         c.Check(n, Equals, int64(3))
533         c.Check(err, Equals, BadChecksum)
534
535         <-st.handled
536 }
537
538 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
539
540         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
541
542         fh := FailHandler{
543                 make(chan string, 1)}
544
545         st := StubGetHandler{
546                 c,
547                 hash,
548                 "abc123",
549                 []byte("foo")}
550
551         kc, _ := MakeKeepClient()
552         kc.ApiToken = "abc123"
553         service_roots := make([]string, 5)
554
555         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
556         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
557
558         for i, k := range ks1 {
559                 service_roots[i] = k.url
560                 defer k.listener.Close()
561         }
562         for i, k := range ks2 {
563                 service_roots[len(ks1)+i] = k.url
564                 defer k.listener.Close()
565         }
566
567         kc.SetServiceRoots(service_roots)
568
569         r, n, url2, err := kc.Get(hash)
570         <-fh.handled
571         c.Check(err, Equals, nil)
572         c.Check(n, Equals, int64(3))
573         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
574
575         content, err2 := ioutil.ReadAll(r)
576         c.Check(err2, Equals, nil)
577         c.Check(content, DeepEquals, []byte("foo"))
578 }
579
580 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
581         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
582         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
583         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
584
585         kc, err := MakeKeepClient()
586         c.Assert(err, Equals, nil)
587
588         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
589
590         {
591                 n, _, err := kc.Ask(hash)
592                 c.Check(err, Equals, BlockNotFound)
593                 c.Check(n, Equals, int64(0))
594         }
595         {
596                 hash2, replicas, err := kc.PutB([]byte("foo"))
597                 c.Check(hash2, Equals, hash)
598                 c.Check(replicas, Equals, 2)
599                 c.Check(err, Equals, nil)
600         }
601         {
602                 r, n, url2, err := kc.Get(hash)
603                 c.Check(err, Equals, nil)
604                 c.Check(n, Equals, int64(3))
605                 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
606
607                 content, err2 := ioutil.ReadAll(r)
608                 c.Check(err2, Equals, nil)
609                 c.Check(content, DeepEquals, []byte("foo"))
610         }
611         {
612                 n, url2, err := kc.Ask(hash)
613                 c.Check(err, Equals, nil)
614                 c.Check(n, Equals, int64(3))
615                 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
616         }
617 }
618
619 type StubProxyHandler struct {
620         handled chan string
621 }
622
623 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
624         resp.Header().Set("X-Keep-Replicas-Stored", "2")
625         this.handled <- fmt.Sprintf("http://%s", req.Host)
626 }
627
628 func (s *StandaloneSuite) TestPutProxy(c *C) {
629         log.Printf("TestPutProxy")
630
631         st := StubProxyHandler{make(chan string, 1)}
632
633         kc, _ := MakeKeepClient()
634
635         kc.Want_replicas = 2
636         kc.Using_proxy = true
637         kc.ApiToken = "abc123"
638         service_roots := make([]string, 1)
639
640         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
641
642         for i, k := range ks1 {
643                 service_roots[i] = k.url
644                 defer k.listener.Close()
645         }
646
647         kc.SetServiceRoots(service_roots)
648
649         _, replicas, err := kc.PutB([]byte("foo"))
650         <-st.handled
651
652         c.Check(err, Equals, nil)
653         c.Check(replicas, Equals, 2)
654
655         log.Printf("TestPutProxy done")
656 }
657
658 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
659         log.Printf("TestPutProxy")
660
661         st := StubProxyHandler{make(chan string, 1)}
662
663         kc, _ := MakeKeepClient()
664
665         kc.Want_replicas = 3
666         kc.Using_proxy = true
667         kc.ApiToken = "abc123"
668         service_roots := make([]string, 1)
669
670         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
671
672         for i, k := range ks1 {
673                 service_roots[i] = k.url
674                 defer k.listener.Close()
675         }
676         kc.SetServiceRoots(service_roots)
677
678         _, replicas, err := kc.PutB([]byte("foo"))
679         <-st.handled
680
681         c.Check(err, Equals, InsufficientReplicasError)
682         c.Check(replicas, Equals, 2)
683
684         log.Printf("TestPutProxy done")
685 }