Merge branch 'master' into 7546-put-retry
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "crypto/md5"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9         "git.curoverse.com/arvados.git/sdk/go/streamer"
10         . "gopkg.in/check.v1"
11         "io"
12         "io/ioutil"
13         "log"
14         "net"
15         "net/http"
16         "os"
17         "testing"
18 )
19
20 // Gocheck boilerplate
21 func Test(t *testing.T) {
22         TestingT(t)
23 }
24
25 // Gocheck boilerplate
26 var _ = Suite(&ServerRequiredSuite{})
27 var _ = Suite(&StandaloneSuite{})
28
29 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
30
31 // Tests that require the Keep server running
32 type ServerRequiredSuite struct{}
33
34 // Standalone tests
35 type StandaloneSuite struct{}
36
37 func pythonDir() string {
38         cwd, _ := os.Getwd()
39         return fmt.Sprintf("%s/../../python/tests", cwd)
40 }
41
42 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
43         if *no_server {
44                 c.Skip("Skipping tests that require server")
45                 return
46         }
47         arvadostest.StartAPI()
48         arvadostest.StartKeep(2, false)
49 }
50
51 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
52         if *no_server {
53                 return
54         }
55         arvadostest.StopKeep(2)
56         arvadostest.StopAPI()
57 }
58
59 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
60         arv, err := arvadosclient.MakeArvadosClient()
61         c.Assert(err, Equals, nil)
62
63         kc, err := MakeKeepClient(&arv)
64
65         c.Assert(err, Equals, nil)
66         c.Check(len(kc.LocalRoots()), Equals, 2)
67         for _, root := range kc.LocalRoots() {
68                 c.Check(root, Matches, "http://localhost:\\d+")
69         }
70 }
71
72 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
73         arv, err := arvadosclient.MakeArvadosClient()
74         c.Assert(err, Equals, nil)
75
76         kc, err := MakeKeepClient(&arv)
77         c.Assert(kc.Want_replicas, Equals, 2)
78
79         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
80         kc, err = MakeKeepClient(&arv)
81         c.Assert(kc.Want_replicas, Equals, 3)
82
83         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
84         kc, err = MakeKeepClient(&arv)
85         c.Assert(kc.Want_replicas, Equals, 1)
86 }
87
88 type StubPutHandler struct {
89         c              *C
90         expectPath     string
91         expectApiToken string
92         expectBody     string
93         handled        chan string
94 }
95
96 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
97         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
98         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
99         body, err := ioutil.ReadAll(req.Body)
100         sph.c.Check(err, Equals, nil)
101         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
102         resp.WriteHeader(200)
103         sph.handled <- fmt.Sprintf("http://%s", req.Host)
104 }
105
106 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
107         var err error
108         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
109         if err != nil {
110                 panic(fmt.Sprintf("Could not listen on any port"))
111         }
112         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
113         go http.Serve(ks.listener, st)
114         return
115 }
116
117 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
118         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
119
120         ks := RunFakeKeepServer(st)
121         defer ks.listener.Close()
122
123         arv, _ := arvadosclient.MakeArvadosClient()
124         arv.ApiToken = "abc123"
125
126         kc, _ := MakeKeepClient(&arv)
127
128         reader, writer := io.Pipe()
129         upload_status := make(chan uploadStatus)
130
131         f(kc, ks.url, reader, writer, upload_status)
132 }
133
134 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
135         log.Printf("TestUploadToStubKeepServer")
136
137         st := StubPutHandler{
138                 c,
139                 "acbd18db4cc2f85cedef654fccc4a4d8",
140                 "abc123",
141                 "foo",
142                 make(chan string)}
143
144         UploadToStubHelper(c, st,
145                 func(kc *KeepClient, url string, reader io.ReadCloser,
146                         writer io.WriteCloser, upload_status chan uploadStatus) {
147
148                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), "TestUploadToStubKeepServer")
149
150                         writer.Write([]byte("foo"))
151                         writer.Close()
152
153                         <-st.handled
154                         status := <-upload_status
155                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
156                 })
157
158         log.Printf("TestUploadToStubKeepServer done")
159 }
160
161 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
162         log.Printf("TestUploadToStubKeepServerBufferReader")
163
164         st := StubPutHandler{
165                 c,
166                 "acbd18db4cc2f85cedef654fccc4a4d8",
167                 "abc123",
168                 "foo",
169                 make(chan string)}
170
171         UploadToStubHelper(c, st,
172                 func(kc *KeepClient, url string, reader io.ReadCloser,
173                         writer io.WriteCloser, upload_status chan uploadStatus) {
174
175                         tr := streamer.AsyncStreamFromReader(512, reader)
176                         defer tr.Close()
177
178                         br1 := tr.MakeStreamReader()
179
180                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, "TestUploadToStubKeepServerBufferReader")
181
182                         writer.Write([]byte("foo"))
183                         writer.Close()
184
185                         <-st.handled
186
187                         status := <-upload_status
188                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
189                 })
190
191         log.Printf("TestUploadToStubKeepServerBufferReader done")
192 }
193
194 type FailHandler struct {
195         handled chan string
196 }
197
198 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
199         resp.WriteHeader(500)
200         fh.handled <- fmt.Sprintf("http://%s", req.Host)
201 }
202
203 type FailThenSucceedHandler struct {
204         handled        chan string
205         count          int
206         successhandler StubGetHandler
207 }
208
209 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
210         if fh.count == 0 {
211                 resp.WriteHeader(500)
212                 fh.count += 1
213                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
214         } else {
215                 fh.successhandler.ServeHTTP(resp, req)
216         }
217 }
218
219 type Error404Handler struct {
220         handled chan string
221 }
222
223 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
224         resp.WriteHeader(404)
225         fh.handled <- fmt.Sprintf("http://%s", req.Host)
226 }
227
228 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
229         log.Printf("TestFailedUploadToStubKeepServer")
230
231         st := FailHandler{
232                 make(chan string)}
233
234         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
235
236         UploadToStubHelper(c, st,
237                 func(kc *KeepClient, url string, reader io.ReadCloser,
238                         writer io.WriteCloser, upload_status chan uploadStatus) {
239
240                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, "TestFailedUploadToStubKeepServer")
241
242                         writer.Write([]byte("foo"))
243                         writer.Close()
244
245                         <-st.handled
246
247                         status := <-upload_status
248                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
249                         c.Check(status.statusCode, Equals, 500)
250                 })
251         log.Printf("TestFailedUploadToStubKeepServer done")
252 }
253
254 type KeepServer struct {
255         listener net.Listener
256         url      string
257 }
258
259 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
260         ks = make([]KeepServer, n)
261
262         for i := 0; i < n; i += 1 {
263                 ks[i] = RunFakeKeepServer(st)
264         }
265
266         return ks
267 }
268
269 func (s *StandaloneSuite) TestPutB(c *C) {
270         log.Printf("TestPutB")
271
272         hash := Md5String("foo")
273
274         st := StubPutHandler{
275                 c,
276                 hash,
277                 "abc123",
278                 "foo",
279                 make(chan string, 5)}
280
281         arv, _ := arvadosclient.MakeArvadosClient()
282         kc, _ := MakeKeepClient(&arv)
283
284         kc.Want_replicas = 2
285         arv.ApiToken = "abc123"
286         localRoots := make(map[string]string)
287         writableLocalRoots := make(map[string]string)
288
289         ks := RunSomeFakeKeepServers(st, 5)
290
291         for i, k := range ks {
292                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
293                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
294                 defer k.listener.Close()
295         }
296
297         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
298
299         kc.PutB([]byte("foo"))
300
301         shuff := NewRootSorter(
302                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
303
304         s1 := <-st.handled
305         s2 := <-st.handled
306         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
307                 (s1 == shuff[1] && s2 == shuff[0]),
308                 Equals,
309                 true)
310
311         log.Printf("TestPutB done")
312 }
313
314 func (s *StandaloneSuite) TestPutHR(c *C) {
315         log.Printf("TestPutHR")
316
317         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
318
319         st := StubPutHandler{
320                 c,
321                 hash,
322                 "abc123",
323                 "foo",
324                 make(chan string, 5)}
325
326         arv, _ := arvadosclient.MakeArvadosClient()
327         kc, _ := MakeKeepClient(&arv)
328
329         kc.Want_replicas = 2
330         arv.ApiToken = "abc123"
331         localRoots := make(map[string]string)
332         writableLocalRoots := make(map[string]string)
333
334         ks := RunSomeFakeKeepServers(st, 5)
335
336         for i, k := range ks {
337                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
338                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
339                 defer k.listener.Close()
340         }
341
342         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
343
344         reader, writer := io.Pipe()
345
346         go func() {
347                 writer.Write([]byte("foo"))
348                 writer.Close()
349         }()
350
351         kc.PutHR(hash, reader, 3)
352
353         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
354         log.Print(shuff)
355
356         s1 := <-st.handled
357         s2 := <-st.handled
358
359         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
360                 (s1 == shuff[1] && s2 == shuff[0]),
361                 Equals,
362                 true)
363
364         log.Printf("TestPutHR done")
365 }
366
367 func (s *StandaloneSuite) TestPutWithFail(c *C) {
368         log.Printf("TestPutWithFail")
369
370         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
371
372         st := StubPutHandler{
373                 c,
374                 hash,
375                 "abc123",
376                 "foo",
377                 make(chan string, 4)}
378
379         fh := FailHandler{
380                 make(chan string, 1)}
381
382         arv, err := arvadosclient.MakeArvadosClient()
383         kc, _ := MakeKeepClient(&arv)
384
385         kc.Want_replicas = 2
386         arv.ApiToken = "abc123"
387         localRoots := make(map[string]string)
388         writableLocalRoots := make(map[string]string)
389
390         ks1 := RunSomeFakeKeepServers(st, 4)
391         ks2 := RunSomeFakeKeepServers(fh, 1)
392
393         for i, k := range ks1 {
394                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
395                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
396                 defer k.listener.Close()
397         }
398         for i, k := range ks2 {
399                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
400                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
401                 defer k.listener.Close()
402         }
403
404         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
405
406         shuff := NewRootSorter(
407                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
408
409         phash, replicas, err := kc.PutB([]byte("foo"))
410
411         <-fh.handled
412
413         c.Check(err, Equals, nil)
414         c.Check(phash, Equals, "")
415         c.Check(replicas, Equals, 2)
416
417         s1 := <-st.handled
418         s2 := <-st.handled
419
420         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
421                 (s1 == shuff[2] && s2 == shuff[1]),
422                 Equals,
423                 true)
424 }
425
426 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
427         log.Printf("TestPutWithTooManyFail")
428
429         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
430
431         st := StubPutHandler{
432                 c,
433                 hash,
434                 "abc123",
435                 "foo",
436                 make(chan string, 1)}
437
438         fh := FailHandler{
439                 make(chan string, 4)}
440
441         arv, err := arvadosclient.MakeArvadosClient()
442         kc, _ := MakeKeepClient(&arv)
443
444         kc.Want_replicas = 2
445         arv.ApiToken = "abc123"
446         localRoots := make(map[string]string)
447         writableLocalRoots := make(map[string]string)
448
449         ks1 := RunSomeFakeKeepServers(st, 1)
450         ks2 := RunSomeFakeKeepServers(fh, 4)
451
452         for i, k := range ks1 {
453                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
454                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
455                 defer k.listener.Close()
456         }
457         for i, k := range ks2 {
458                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
459                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
460                 defer k.listener.Close()
461         }
462
463         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
464
465         _, replicas, err := kc.PutB([]byte("foo"))
466
467         c.Check(err, Equals, InsufficientReplicasError)
468         c.Check(replicas, Equals, 1)
469         c.Check(<-st.handled, Equals, ks1[0].url)
470
471         log.Printf("TestPutWithTooManyFail done")
472 }
473
474 type StubGetHandler struct {
475         c              *C
476         expectPath     string
477         expectApiToken string
478         httpStatus     int
479         body           []byte
480 }
481
482 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
483         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
484         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
485         resp.WriteHeader(sgh.httpStatus)
486         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
487         resp.Write(sgh.body)
488 }
489
490 func (s *StandaloneSuite) TestGet(c *C) {
491         log.Printf("TestGet")
492
493         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
494
495         st := StubGetHandler{
496                 c,
497                 hash,
498                 "abc123",
499                 http.StatusOK,
500                 []byte("foo")}
501
502         ks := RunFakeKeepServer(st)
503         defer ks.listener.Close()
504
505         arv, err := arvadosclient.MakeArvadosClient()
506         kc, _ := MakeKeepClient(&arv)
507         arv.ApiToken = "abc123"
508         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
509
510         r, n, url2, err := kc.Get(hash)
511         defer r.Close()
512         c.Check(err, Equals, nil)
513         c.Check(n, Equals, int64(3))
514         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
515
516         content, err2 := ioutil.ReadAll(r)
517         c.Check(err2, Equals, nil)
518         c.Check(content, DeepEquals, []byte("foo"))
519
520         log.Printf("TestGet done")
521 }
522
523 func (s *StandaloneSuite) TestGet404(c *C) {
524         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
525
526         st := Error404Handler{make(chan string, 1)}
527
528         ks := RunFakeKeepServer(st)
529         defer ks.listener.Close()
530
531         arv, err := arvadosclient.MakeArvadosClient()
532         kc, _ := MakeKeepClient(&arv)
533         arv.ApiToken = "abc123"
534         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
535
536         r, n, url2, err := kc.Get(hash)
537         c.Check(err, Equals, BlockNotFound)
538         c.Check(n, Equals, int64(0))
539         c.Check(url2, Equals, "")
540         c.Check(r, Equals, nil)
541 }
542
543 func (s *StandaloneSuite) TestGetFail(c *C) {
544         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
545
546         st := FailHandler{make(chan string, 1)}
547
548         ks := RunFakeKeepServer(st)
549         defer ks.listener.Close()
550
551         arv, err := arvadosclient.MakeArvadosClient()
552         kc, _ := MakeKeepClient(&arv)
553         arv.ApiToken = "abc123"
554         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
555
556         r, n, url2, err := kc.Get(hash)
557         c.Check(err, Equals, BlockNotFound)
558         c.Check(n, Equals, int64(0))
559         c.Check(url2, Equals, "")
560         c.Check(r, Equals, nil)
561 }
562
563 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
564         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
565
566         st := &FailThenSucceedHandler{make(chan string, 1), 0,
567                 StubGetHandler{
568                         c,
569                         hash,
570                         "abc123",
571                         http.StatusOK,
572                         []byte("foo")}}
573
574         ks := RunFakeKeepServer(st)
575         defer ks.listener.Close()
576
577         arv, err := arvadosclient.MakeArvadosClient()
578         kc, _ := MakeKeepClient(&arv)
579         arv.ApiToken = "abc123"
580         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
581
582         r, n, url2, err := kc.Get(hash)
583         defer r.Close()
584         c.Check(err, Equals, nil)
585         c.Check(n, Equals, int64(3))
586         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
587
588         content, err2 := ioutil.ReadAll(r)
589         c.Check(err2, Equals, nil)
590         c.Check(content, DeepEquals, []byte("foo"))
591 }
592
593 func (s *StandaloneSuite) TestGetNetError(c *C) {
594         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
595
596         arv, err := arvadosclient.MakeArvadosClient()
597         kc, _ := MakeKeepClient(&arv)
598         arv.ApiToken = "abc123"
599         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
600
601         r, n, url2, err := kc.Get(hash)
602         c.Check(err, Equals, BlockNotFound)
603         c.Check(n, Equals, int64(0))
604         c.Check(url2, Equals, "")
605         c.Check(r, Equals, nil)
606 }
607
608 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
609         uuid := "zzzzz-bi6l4-123451234512345"
610         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
611
612         // This one shouldn't be used:
613         ks0 := RunFakeKeepServer(StubGetHandler{
614                 c,
615                 "error if used",
616                 "abc123",
617                 http.StatusOK,
618                 []byte("foo")})
619         defer ks0.listener.Close()
620         // This one should be used:
621         ks := RunFakeKeepServer(StubGetHandler{
622                 c,
623                 hash + "+K@" + uuid,
624                 "abc123",
625                 http.StatusOK,
626                 []byte("foo")})
627         defer ks.listener.Close()
628
629         arv, err := arvadosclient.MakeArvadosClient()
630         kc, _ := MakeKeepClient(&arv)
631         arv.ApiToken = "abc123"
632         kc.SetServiceRoots(
633                 map[string]string{"x": ks0.url},
634                 nil,
635                 map[string]string{uuid: ks.url})
636
637         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
638         defer r.Close()
639         c.Check(err, Equals, nil)
640         c.Check(n, Equals, int64(3))
641         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
642
643         content, err := ioutil.ReadAll(r)
644         c.Check(err, Equals, nil)
645         c.Check(content, DeepEquals, []byte("foo"))
646 }
647
648 // Use a service hint to fetch from a local disk service, overriding
649 // rendezvous probe order.
650 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
651         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
652         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
653
654         // This one shouldn't be used, although it appears first in
655         // rendezvous probe order:
656         ks0 := RunFakeKeepServer(StubGetHandler{
657                 c,
658                 "error if used",
659                 "abc123",
660                 http.StatusOK,
661                 []byte("foo")})
662         defer ks0.listener.Close()
663         // This one should be used:
664         ks := RunFakeKeepServer(StubGetHandler{
665                 c,
666                 hash + "+K@" + uuid,
667                 "abc123",
668                 http.StatusOK,
669                 []byte("foo")})
670         defer ks.listener.Close()
671
672         arv, err := arvadosclient.MakeArvadosClient()
673         kc, _ := MakeKeepClient(&arv)
674         arv.ApiToken = "abc123"
675         kc.SetServiceRoots(
676                 map[string]string{
677                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
678                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
679                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
680                         uuid: ks.url},
681                 nil,
682                 map[string]string{
683                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
684                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
685                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
686                         uuid: ks.url},
687         )
688
689         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
690         defer r.Close()
691         c.Check(err, Equals, nil)
692         c.Check(n, Equals, int64(3))
693         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
694
695         content, err := ioutil.ReadAll(r)
696         c.Check(err, Equals, nil)
697         c.Check(content, DeepEquals, []byte("foo"))
698 }
699
700 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
701         uuid := "zzzzz-bi6l4-123451234512345"
702         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
703
704         ksLocal := RunFakeKeepServer(StubGetHandler{
705                 c,
706                 hash + "+K@" + uuid,
707                 "abc123",
708                 http.StatusOK,
709                 []byte("foo")})
710         defer ksLocal.listener.Close()
711         ksGateway := RunFakeKeepServer(StubGetHandler{
712                 c,
713                 hash + "+K@" + uuid,
714                 "abc123",
715                 http.StatusInternalServerError,
716                 []byte("Error")})
717         defer ksGateway.listener.Close()
718
719         arv, err := arvadosclient.MakeArvadosClient()
720         kc, _ := MakeKeepClient(&arv)
721         arv.ApiToken = "abc123"
722         kc.SetServiceRoots(
723                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
724                 nil,
725                 map[string]string{uuid: ksGateway.url})
726
727         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
728         c.Assert(err, Equals, nil)
729         defer r.Close()
730         c.Check(n, Equals, int64(3))
731         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
732
733         content, err := ioutil.ReadAll(r)
734         c.Check(err, Equals, nil)
735         c.Check(content, DeepEquals, []byte("foo"))
736 }
737
738 type BarHandler struct {
739         handled chan string
740 }
741
742 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
743         resp.Write([]byte("bar"))
744         this.handled <- fmt.Sprintf("http://%s", req.Host)
745 }
746
747 func (s *StandaloneSuite) TestChecksum(c *C) {
748         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
749         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
750
751         st := BarHandler{make(chan string, 1)}
752
753         ks := RunFakeKeepServer(st)
754         defer ks.listener.Close()
755
756         arv, err := arvadosclient.MakeArvadosClient()
757         kc, _ := MakeKeepClient(&arv)
758         arv.ApiToken = "abc123"
759         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
760
761         r, n, _, err := kc.Get(barhash)
762         _, err = ioutil.ReadAll(r)
763         c.Check(n, Equals, int64(3))
764         c.Check(err, Equals, nil)
765
766         <-st.handled
767
768         r, n, _, err = kc.Get(foohash)
769         _, err = ioutil.ReadAll(r)
770         c.Check(n, Equals, int64(3))
771         c.Check(err, Equals, BadChecksum)
772
773         <-st.handled
774 }
775
776 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
777         content := []byte("waz")
778         hash := fmt.Sprintf("%x", md5.Sum(content))
779
780         fh := Error404Handler{
781                 make(chan string, 4)}
782
783         st := StubGetHandler{
784                 c,
785                 hash,
786                 "abc123",
787                 http.StatusOK,
788                 content}
789
790         arv, err := arvadosclient.MakeArvadosClient()
791         kc, _ := MakeKeepClient(&arv)
792         arv.ApiToken = "abc123"
793         localRoots := make(map[string]string)
794         writableLocalRoots := make(map[string]string)
795
796         ks1 := RunSomeFakeKeepServers(st, 1)
797         ks2 := RunSomeFakeKeepServers(fh, 4)
798
799         for i, k := range ks1 {
800                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
801                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
802                 defer k.listener.Close()
803         }
804         for i, k := range ks2 {
805                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
806                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
807                 defer k.listener.Close()
808         }
809
810         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
811
812         // This test works only if one of the failing services is
813         // attempted before the succeeding service. Otherwise,
814         // <-fh.handled below will just hang! (Probe order depends on
815         // the choice of block content "waz" and the UUIDs of the fake
816         // servers, so we just tried different strings until we found
817         // an example that passes this Assert.)
818         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
819
820         r, n, url2, err := kc.Get(hash)
821
822         <-fh.handled
823         c.Check(err, Equals, nil)
824         c.Check(n, Equals, int64(3))
825         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
826
827         read_content, err2 := ioutil.ReadAll(r)
828         c.Check(err2, Equals, nil)
829         c.Check(read_content, DeepEquals, content)
830 }
831
832 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
833         content := []byte("TestPutGetHead")
834
835         arv, err := arvadosclient.MakeArvadosClient()
836         kc, err := MakeKeepClient(&arv)
837         c.Assert(err, Equals, nil)
838
839         hash := fmt.Sprintf("%x", md5.Sum(content))
840
841         {
842                 n, _, err := kc.Ask(hash)
843                 c.Check(err, Equals, BlockNotFound)
844                 c.Check(n, Equals, int64(0))
845         }
846         {
847                 hash2, replicas, err := kc.PutB(content)
848                 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
849                 c.Check(replicas, Equals, 2)
850                 c.Check(err, Equals, nil)
851         }
852         {
853                 r, n, url2, err := kc.Get(hash)
854                 c.Check(err, Equals, nil)
855                 c.Check(n, Equals, int64(len(content)))
856                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
857
858                 read_content, err2 := ioutil.ReadAll(r)
859                 c.Check(err2, Equals, nil)
860                 c.Check(read_content, DeepEquals, content)
861         }
862         {
863                 n, url2, err := kc.Ask(hash)
864                 c.Check(err, Equals, nil)
865                 c.Check(n, Equals, int64(len(content)))
866                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
867         }
868 }
869
870 type StubProxyHandler struct {
871         handled chan string
872 }
873
874 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
875         resp.Header().Set("X-Keep-Replicas-Stored", "2")
876         this.handled <- fmt.Sprintf("http://%s", req.Host)
877 }
878
879 func (s *StandaloneSuite) TestPutProxy(c *C) {
880         log.Printf("TestPutProxy")
881
882         st := StubProxyHandler{make(chan string, 1)}
883
884         arv, err := arvadosclient.MakeArvadosClient()
885         kc, _ := MakeKeepClient(&arv)
886
887         kc.Want_replicas = 2
888         kc.Using_proxy = true
889         arv.ApiToken = "abc123"
890         localRoots := make(map[string]string)
891         writableLocalRoots := make(map[string]string)
892
893         ks1 := RunSomeFakeKeepServers(st, 1)
894
895         for i, k := range ks1 {
896                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
897                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
898                 defer k.listener.Close()
899         }
900
901         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
902
903         _, replicas, err := kc.PutB([]byte("foo"))
904         <-st.handled
905
906         c.Check(err, Equals, nil)
907         c.Check(replicas, Equals, 2)
908
909         log.Printf("TestPutProxy done")
910 }
911
912 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
913         log.Printf("TestPutProxy")
914
915         st := StubProxyHandler{make(chan string, 1)}
916
917         arv, err := arvadosclient.MakeArvadosClient()
918         kc, _ := MakeKeepClient(&arv)
919
920         kc.Want_replicas = 3
921         kc.Using_proxy = true
922         arv.ApiToken = "abc123"
923         localRoots := make(map[string]string)
924         writableLocalRoots := make(map[string]string)
925
926         ks1 := RunSomeFakeKeepServers(st, 1)
927
928         for i, k := range ks1 {
929                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
930                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
931                 defer k.listener.Close()
932         }
933         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
934
935         _, replicas, err := kc.PutB([]byte("foo"))
936         <-st.handled
937
938         c.Check(err, Equals, InsufficientReplicasError)
939         c.Check(replicas, Equals, 2)
940
941         log.Printf("TestPutProxy done")
942 }
943
944 func (s *StandaloneSuite) TestMakeLocator(c *C) {
945         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
946         c.Check(err, Equals, nil)
947         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
948         c.Check(l.Size, Equals, 3)
949         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
950 }
951
952 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
953         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
954         c.Check(err, Equals, nil)
955         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
956         c.Check(l.Size, Equals, -1)
957         c.Check(l.Hints, DeepEquals, []string{})
958 }
959
960 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
961         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
962         c.Check(err, Equals, nil)
963         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
964         c.Check(l.Size, Equals, -1)
965         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
966 }
967
968 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
969         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
970         l, err := MakeLocator(str)
971         c.Check(err, Equals, nil)
972         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
973         c.Check(l.Size, Equals, 3)
974         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
975         c.Check(l.String(), Equals, str)
976 }
977
978 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
979         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
980         c.Check(err, Equals, InvalidLocatorError)
981 }
982
983 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
984         hash := Md5String("foo")
985
986         st := StubPutHandler{
987                 c,
988                 hash,
989                 "abc123",
990                 "foo",
991                 make(chan string, 5)}
992
993         arv, _ := arvadosclient.MakeArvadosClient()
994         kc, _ := MakeKeepClient(&arv)
995
996         kc.Want_replicas = 2
997         arv.ApiToken = "abc123"
998         localRoots := make(map[string]string)
999         writableLocalRoots := make(map[string]string)
1000
1001         ks := RunSomeFakeKeepServers(st, 5)
1002
1003         for i, k := range ks {
1004                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1005                 if i == 0 {
1006                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1007                 }
1008                 defer k.listener.Close()
1009         }
1010
1011         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1012
1013         _, replicas, err := kc.PutB([]byte("foo"))
1014
1015         c.Check(err, Equals, InsufficientReplicasError)
1016         c.Check(replicas, Equals, 1)
1017
1018         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1019 }
1020
1021 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1022         hash := Md5String("foo")
1023
1024         st := StubPutHandler{
1025                 c,
1026                 hash,
1027                 "abc123",
1028                 "foo",
1029                 make(chan string, 5)}
1030
1031         arv, _ := arvadosclient.MakeArvadosClient()
1032         kc, _ := MakeKeepClient(&arv)
1033
1034         kc.Want_replicas = 2
1035         arv.ApiToken = "abc123"
1036         localRoots := make(map[string]string)
1037         writableLocalRoots := make(map[string]string)
1038
1039         ks := RunSomeFakeKeepServers(st, 5)
1040
1041         for i, k := range ks {
1042                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1043                 defer k.listener.Close()
1044         }
1045
1046         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1047
1048         _, replicas, err := kc.PutB([]byte("foo"))
1049
1050         c.Check(err, Equals, InsufficientReplicasError)
1051         c.Check(replicas, Equals, 0)
1052 }
1053
1054 type StubGetIndexHandler struct {
1055         c              *C
1056         expectPath     string
1057         expectAPIToken string
1058         httpStatus     int
1059         body           []byte
1060 }
1061
1062 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1063         h.c.Check(req.URL.Path, Equals, h.expectPath)
1064         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1065         resp.WriteHeader(h.httpStatus)
1066         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1067         resp.Write(h.body)
1068 }
1069
1070 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1071         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1072
1073         st := StubGetIndexHandler{
1074                 c,
1075                 "/index",
1076                 "abc123",
1077                 http.StatusOK,
1078                 []byte(hash + "+3 1443559274\n\n")}
1079
1080         ks := RunFakeKeepServer(st)
1081         defer ks.listener.Close()
1082
1083         arv, err := arvadosclient.MakeArvadosClient()
1084         kc, _ := MakeKeepClient(&arv)
1085         arv.ApiToken = "abc123"
1086         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1087
1088         r, err := kc.GetIndex("x", "")
1089         c.Check(err, Equals, nil)
1090
1091         content, err2 := ioutil.ReadAll(r)
1092         c.Check(err2, Equals, nil)
1093         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1094 }
1095
1096 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1097         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1098
1099         st := StubGetIndexHandler{
1100                 c,
1101                 "/index/" + hash[0:3],
1102                 "abc123",
1103                 http.StatusOK,
1104                 []byte(hash + "+3 1443559274\n\n")}
1105
1106         ks := RunFakeKeepServer(st)
1107         defer ks.listener.Close()
1108
1109         arv, err := arvadosclient.MakeArvadosClient()
1110         kc, _ := MakeKeepClient(&arv)
1111         arv.ApiToken = "abc123"
1112         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1113
1114         r, err := kc.GetIndex("x", hash[0:3])
1115         c.Check(err, Equals, nil)
1116
1117         content, err2 := ioutil.ReadAll(r)
1118         c.Check(err2, Equals, nil)
1119         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1120 }
1121
1122 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1123         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1124
1125         st := StubGetIndexHandler{
1126                 c,
1127                 "/index/" + hash[0:3],
1128                 "abc123",
1129                 http.StatusOK,
1130                 []byte(hash)}
1131
1132         ks := RunFakeKeepServer(st)
1133         defer ks.listener.Close()
1134
1135         arv, err := arvadosclient.MakeArvadosClient()
1136         kc, _ := MakeKeepClient(&arv)
1137         arv.ApiToken = "abc123"
1138         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1139
1140         _, err = kc.GetIndex("x", hash[0:3])
1141         c.Check(err, Equals, ErrIncompleteIndex)
1142 }
1143
1144 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1145         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1146
1147         st := StubGetIndexHandler{
1148                 c,
1149                 "/index/" + hash[0:3],
1150                 "abc123",
1151                 http.StatusOK,
1152                 []byte(hash)}
1153
1154         ks := RunFakeKeepServer(st)
1155         defer ks.listener.Close()
1156
1157         arv, err := arvadosclient.MakeArvadosClient()
1158         kc, _ := MakeKeepClient(&arv)
1159         arv.ApiToken = "abc123"
1160         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1161
1162         _, err = kc.GetIndex("y", hash[0:3])
1163         c.Check(err, Equals, ErrNoSuchKeepServer)
1164 }
1165
1166 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1167         st := StubGetIndexHandler{
1168                 c,
1169                 "/index/abcd",
1170                 "abc123",
1171                 http.StatusOK,
1172                 []byte("\n")}
1173
1174         ks := RunFakeKeepServer(st)
1175         defer ks.listener.Close()
1176
1177         arv, err := arvadosclient.MakeArvadosClient()
1178         kc, _ := MakeKeepClient(&arv)
1179         arv.ApiToken = "abc123"
1180         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1181
1182         r, err := kc.GetIndex("x", "abcd")
1183         c.Check(err, Equals, nil)
1184
1185         content, err2 := ioutil.ReadAll(r)
1186         c.Check(err2, Equals, nil)
1187         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1188 }
1189
1190 type FailThenSucceedPutHandler struct {
1191         handled        chan string
1192         count          int
1193         successhandler StubPutHandler
1194 }
1195
1196 func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1197         if h.count == 0 {
1198                 resp.WriteHeader(500)
1199                 h.count += 1
1200                 h.handled <- fmt.Sprintf("http://%s", req.Host)
1201         } else {
1202                 h.successhandler.ServeHTTP(resp, req)
1203         }
1204 }
1205
1206 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1207         st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
1208                 StubPutHandler{
1209                         c,
1210                         Md5String("foo"),
1211                         "abc123",
1212                         "foo",
1213                         make(chan string, 5)}}
1214
1215         arv, _ := arvadosclient.MakeArvadosClient()
1216         kc, _ := MakeKeepClient(&arv)
1217
1218         kc.Want_replicas = 2
1219         arv.ApiToken = "abc123"
1220         localRoots := make(map[string]string)
1221         writableLocalRoots := make(map[string]string)
1222
1223         ks := RunSomeFakeKeepServers(st, 2)
1224
1225         for i, k := range ks {
1226                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1227                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1228                 defer k.listener.Close()
1229         }
1230
1231         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1232
1233         hash, replicas, err := kc.PutB([]byte("foo"))
1234
1235         c.Check(err, Equals, nil)
1236         c.Check(hash, Equals, "")
1237         c.Check(replicas, Equals, 2)
1238 }