Merge branch '7587-httplib2-retries-wip'
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "crypto/md5"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9         "git.curoverse.com/arvados.git/sdk/go/streamer"
10         . "gopkg.in/check.v1"
11         "io"
12         "io/ioutil"
13         "log"
14         "net"
15         "net/http"
16         "os"
17         "testing"
18 )
19
20 // Gocheck boilerplate
21 func Test(t *testing.T) {
22         TestingT(t)
23 }
24
25 // Gocheck boilerplate
26 var _ = Suite(&ServerRequiredSuite{})
27 var _ = Suite(&StandaloneSuite{})
28
29 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
30
31 // Tests that require the Keep server running
32 type ServerRequiredSuite struct{}
33
34 // Standalone tests
35 type StandaloneSuite struct{}
36
37 func pythonDir() string {
38         cwd, _ := os.Getwd()
39         return fmt.Sprintf("%s/../../python/tests", cwd)
40 }
41
42 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
43         if *no_server {
44                 c.Skip("Skipping tests that require server")
45                 return
46         }
47         arvadostest.StartAPI()
48         arvadostest.StartKeep(2, false)
49 }
50
51 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
52         if *no_server {
53                 return
54         }
55         arvadostest.StopKeep(2)
56         arvadostest.StopAPI()
57 }
58
59 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
60         arv, err := arvadosclient.MakeArvadosClient()
61         c.Assert(err, Equals, nil)
62
63         kc, err := MakeKeepClient(&arv)
64
65         c.Assert(err, Equals, nil)
66         c.Check(len(kc.LocalRoots()), Equals, 2)
67         for _, root := range kc.LocalRoots() {
68                 c.Check(root, Matches, "http://localhost:\\d+")
69         }
70 }
71
72 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
73         arv, err := arvadosclient.MakeArvadosClient()
74         c.Assert(err, Equals, nil)
75
76         kc, err := MakeKeepClient(&arv)
77         c.Assert(kc.Want_replicas, Equals, 2)
78
79         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
80         kc, err = MakeKeepClient(&arv)
81         c.Assert(kc.Want_replicas, Equals, 3)
82
83         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
84         kc, err = MakeKeepClient(&arv)
85         c.Assert(kc.Want_replicas, Equals, 1)
86 }
87
88 type StubPutHandler struct {
89         c              *C
90         expectPath     string
91         expectApiToken string
92         expectBody     string
93         handled        chan string
94 }
95
96 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
97         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
98         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
99         body, err := ioutil.ReadAll(req.Body)
100         sph.c.Check(err, Equals, nil)
101         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
102         resp.WriteHeader(200)
103         sph.handled <- fmt.Sprintf("http://%s", req.Host)
104 }
105
106 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
107         var err error
108         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
109         if err != nil {
110                 panic(fmt.Sprintf("Could not listen on any port"))
111         }
112         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
113         go http.Serve(ks.listener, st)
114         return
115 }
116
117 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
118         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
119
120         ks := RunFakeKeepServer(st)
121         defer ks.listener.Close()
122
123         arv, _ := arvadosclient.MakeArvadosClient()
124         arv.ApiToken = "abc123"
125
126         kc, _ := MakeKeepClient(&arv)
127
128         reader, writer := io.Pipe()
129         upload_status := make(chan uploadStatus)
130
131         f(kc, ks.url, reader, writer, upload_status)
132 }
133
134 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
135         log.Printf("TestUploadToStubKeepServer")
136
137         st := StubPutHandler{
138                 c,
139                 "acbd18db4cc2f85cedef654fccc4a4d8",
140                 "abc123",
141                 "foo",
142                 make(chan string)}
143
144         UploadToStubHelper(c, st,
145                 func(kc *KeepClient, url string, reader io.ReadCloser,
146                         writer io.WriteCloser, upload_status chan uploadStatus) {
147
148                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), "TestUploadToStubKeepServer")
149
150                         writer.Write([]byte("foo"))
151                         writer.Close()
152
153                         <-st.handled
154                         status := <-upload_status
155                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
156                 })
157
158         log.Printf("TestUploadToStubKeepServer done")
159 }
160
161 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
162         log.Printf("TestUploadToStubKeepServerBufferReader")
163
164         st := StubPutHandler{
165                 c,
166                 "acbd18db4cc2f85cedef654fccc4a4d8",
167                 "abc123",
168                 "foo",
169                 make(chan string)}
170
171         UploadToStubHelper(c, st,
172                 func(kc *KeepClient, url string, reader io.ReadCloser,
173                         writer io.WriteCloser, upload_status chan uploadStatus) {
174
175                         tr := streamer.AsyncStreamFromReader(512, reader)
176                         defer tr.Close()
177
178                         br1 := tr.MakeStreamReader()
179
180                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, "TestUploadToStubKeepServerBufferReader")
181
182                         writer.Write([]byte("foo"))
183                         writer.Close()
184
185                         <-st.handled
186
187                         status := <-upload_status
188                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
189                 })
190
191         log.Printf("TestUploadToStubKeepServerBufferReader done")
192 }
193
194 type FailHandler struct {
195         handled chan string
196 }
197
198 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
199         resp.WriteHeader(500)
200         fh.handled <- fmt.Sprintf("http://%s", req.Host)
201 }
202
203 type FailThenSucceedHandler struct {
204         handled        chan string
205         count          int
206         successhandler StubGetHandler
207 }
208
209 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
210         if fh.count == 0 {
211                 resp.WriteHeader(500)
212                 fh.count += 1
213                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
214         } else {
215                 fh.successhandler.ServeHTTP(resp, req)
216         }
217 }
218
219 type Error404Handler struct {
220         handled chan string
221 }
222
223 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
224         resp.WriteHeader(404)
225         fh.handled <- fmt.Sprintf("http://%s", req.Host)
226 }
227
228 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
229         log.Printf("TestFailedUploadToStubKeepServer")
230
231         st := FailHandler{
232                 make(chan string)}
233
234         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
235
236         UploadToStubHelper(c, st,
237                 func(kc *KeepClient, url string, reader io.ReadCloser,
238                         writer io.WriteCloser, upload_status chan uploadStatus) {
239
240                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, "TestFailedUploadToStubKeepServer")
241
242                         writer.Write([]byte("foo"))
243                         writer.Close()
244
245                         <-st.handled
246
247                         status := <-upload_status
248                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
249                         c.Check(status.statusCode, Equals, 500)
250                 })
251         log.Printf("TestFailedUploadToStubKeepServer done")
252 }
253
254 type KeepServer struct {
255         listener net.Listener
256         url      string
257 }
258
259 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
260         ks = make([]KeepServer, n)
261
262         for i := 0; i < n; i += 1 {
263                 ks[i] = RunFakeKeepServer(st)
264         }
265
266         return ks
267 }
268
269 func (s *StandaloneSuite) TestPutB(c *C) {
270         log.Printf("TestPutB")
271
272         hash := Md5String("foo")
273
274         st := StubPutHandler{
275                 c,
276                 hash,
277                 "abc123",
278                 "foo",
279                 make(chan string, 5)}
280
281         arv, _ := arvadosclient.MakeArvadosClient()
282         kc, _ := MakeKeepClient(&arv)
283
284         kc.Want_replicas = 2
285         arv.ApiToken = "abc123"
286         localRoots := make(map[string]string)
287         writableLocalRoots := make(map[string]string)
288
289         ks := RunSomeFakeKeepServers(st, 5)
290
291         for i, k := range ks {
292                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
293                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
294                 defer k.listener.Close()
295         }
296
297         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
298
299         kc.PutB([]byte("foo"))
300
301         shuff := NewRootSorter(
302                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
303
304         s1 := <-st.handled
305         s2 := <-st.handled
306         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
307                 (s1 == shuff[1] && s2 == shuff[0]),
308                 Equals,
309                 true)
310
311         log.Printf("TestPutB done")
312 }
313
314 func (s *StandaloneSuite) TestPutHR(c *C) {
315         log.Printf("TestPutHR")
316
317         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
318
319         st := StubPutHandler{
320                 c,
321                 hash,
322                 "abc123",
323                 "foo",
324                 make(chan string, 5)}
325
326         arv, _ := arvadosclient.MakeArvadosClient()
327         kc, _ := MakeKeepClient(&arv)
328
329         kc.Want_replicas = 2
330         arv.ApiToken = "abc123"
331         localRoots := make(map[string]string)
332         writableLocalRoots := make(map[string]string)
333
334         ks := RunSomeFakeKeepServers(st, 5)
335
336         for i, k := range ks {
337                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
338                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
339                 defer k.listener.Close()
340         }
341
342         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
343
344         reader, writer := io.Pipe()
345
346         go func() {
347                 writer.Write([]byte("foo"))
348                 writer.Close()
349         }()
350
351         kc.PutHR(hash, reader, 3)
352
353         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
354         log.Print(shuff)
355
356         s1 := <-st.handled
357         s2 := <-st.handled
358
359         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
360                 (s1 == shuff[1] && s2 == shuff[0]),
361                 Equals,
362                 true)
363
364         log.Printf("TestPutHR done")
365 }
366
367 func (s *StandaloneSuite) TestPutWithFail(c *C) {
368         log.Printf("TestPutWithFail")
369
370         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
371
372         st := StubPutHandler{
373                 c,
374                 hash,
375                 "abc123",
376                 "foo",
377                 make(chan string, 4)}
378
379         fh := FailHandler{
380                 make(chan string, 1)}
381
382         arv, err := arvadosclient.MakeArvadosClient()
383         kc, _ := MakeKeepClient(&arv)
384
385         kc.Want_replicas = 2
386         arv.ApiToken = "abc123"
387         localRoots := make(map[string]string)
388         writableLocalRoots := make(map[string]string)
389
390         ks1 := RunSomeFakeKeepServers(st, 4)
391         ks2 := RunSomeFakeKeepServers(fh, 1)
392
393         for i, k := range ks1 {
394                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
395                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
396                 defer k.listener.Close()
397         }
398         for i, k := range ks2 {
399                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
400                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
401                 defer k.listener.Close()
402         }
403
404         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
405
406         shuff := NewRootSorter(
407                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
408
409         phash, replicas, err := kc.PutB([]byte("foo"))
410
411         <-fh.handled
412
413         c.Check(err, Equals, nil)
414         c.Check(phash, Equals, "")
415         c.Check(replicas, Equals, 2)
416
417         s1 := <-st.handled
418         s2 := <-st.handled
419
420         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
421                 (s1 == shuff[2] && s2 == shuff[1]),
422                 Equals,
423                 true)
424 }
425
426 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
427         log.Printf("TestPutWithTooManyFail")
428
429         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
430
431         st := StubPutHandler{
432                 c,
433                 hash,
434                 "abc123",
435                 "foo",
436                 make(chan string, 1)}
437
438         fh := FailHandler{
439                 make(chan string, 4)}
440
441         arv, err := arvadosclient.MakeArvadosClient()
442         kc, _ := MakeKeepClient(&arv)
443
444         kc.Want_replicas = 2
445         kc.Retries = 0
446         arv.ApiToken = "abc123"
447         localRoots := make(map[string]string)
448         writableLocalRoots := make(map[string]string)
449
450         ks1 := RunSomeFakeKeepServers(st, 1)
451         ks2 := RunSomeFakeKeepServers(fh, 4)
452
453         for i, k := range ks1 {
454                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
455                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
456                 defer k.listener.Close()
457         }
458         for i, k := range ks2 {
459                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
460                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
461                 defer k.listener.Close()
462         }
463
464         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
465
466         _, replicas, err := kc.PutB([]byte("foo"))
467
468         c.Check(err, Equals, InsufficientReplicasError)
469         c.Check(replicas, Equals, 1)
470         c.Check(<-st.handled, Equals, ks1[0].url)
471
472         log.Printf("TestPutWithTooManyFail done")
473 }
474
475 type StubGetHandler struct {
476         c              *C
477         expectPath     string
478         expectApiToken string
479         httpStatus     int
480         body           []byte
481 }
482
483 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
484         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
485         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
486         resp.WriteHeader(sgh.httpStatus)
487         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
488         resp.Write(sgh.body)
489 }
490
491 func (s *StandaloneSuite) TestGet(c *C) {
492         log.Printf("TestGet")
493
494         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
495
496         st := StubGetHandler{
497                 c,
498                 hash,
499                 "abc123",
500                 http.StatusOK,
501                 []byte("foo")}
502
503         ks := RunFakeKeepServer(st)
504         defer ks.listener.Close()
505
506         arv, err := arvadosclient.MakeArvadosClient()
507         kc, _ := MakeKeepClient(&arv)
508         arv.ApiToken = "abc123"
509         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
510
511         r, n, url2, err := kc.Get(hash)
512         defer r.Close()
513         c.Check(err, Equals, nil)
514         c.Check(n, Equals, int64(3))
515         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
516
517         content, err2 := ioutil.ReadAll(r)
518         c.Check(err2, Equals, nil)
519         c.Check(content, DeepEquals, []byte("foo"))
520
521         log.Printf("TestGet done")
522 }
523
524 func (s *StandaloneSuite) TestGet404(c *C) {
525         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
526
527         st := Error404Handler{make(chan string, 1)}
528
529         ks := RunFakeKeepServer(st)
530         defer ks.listener.Close()
531
532         arv, err := arvadosclient.MakeArvadosClient()
533         kc, _ := MakeKeepClient(&arv)
534         arv.ApiToken = "abc123"
535         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
536
537         r, n, url2, err := kc.Get(hash)
538         c.Check(err, Equals, BlockNotFound)
539         c.Check(n, Equals, int64(0))
540         c.Check(url2, Equals, "")
541         c.Check(r, Equals, nil)
542 }
543
544 func (s *StandaloneSuite) TestGetFail(c *C) {
545         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
546
547         st := FailHandler{make(chan string, 1)}
548
549         ks := RunFakeKeepServer(st)
550         defer ks.listener.Close()
551
552         arv, err := arvadosclient.MakeArvadosClient()
553         kc, _ := MakeKeepClient(&arv)
554         arv.ApiToken = "abc123"
555         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
556         kc.Retries = 0
557
558         r, n, url2, err := kc.Get(hash)
559         c.Check(err, Equals, BlockNotFound)
560         c.Check(n, Equals, int64(0))
561         c.Check(url2, Equals, "")
562         c.Check(r, Equals, nil)
563 }
564
565 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
566         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
567
568         st := &FailThenSucceedHandler{make(chan string, 1), 0,
569                 StubGetHandler{
570                         c,
571                         hash,
572                         "abc123",
573                         http.StatusOK,
574                         []byte("foo")}}
575
576         ks := RunFakeKeepServer(st)
577         defer ks.listener.Close()
578
579         arv, err := arvadosclient.MakeArvadosClient()
580         kc, _ := MakeKeepClient(&arv)
581         arv.ApiToken = "abc123"
582         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
583
584         r, n, url2, err := kc.Get(hash)
585         defer r.Close()
586         c.Check(err, Equals, nil)
587         c.Check(n, Equals, int64(3))
588         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
589
590         content, err2 := ioutil.ReadAll(r)
591         c.Check(err2, Equals, nil)
592         c.Check(content, DeepEquals, []byte("foo"))
593 }
594
595 func (s *StandaloneSuite) TestGetNetError(c *C) {
596         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
597
598         arv, err := arvadosclient.MakeArvadosClient()
599         kc, _ := MakeKeepClient(&arv)
600         arv.ApiToken = "abc123"
601         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
602
603         r, n, url2, err := kc.Get(hash)
604         c.Check(err, Equals, BlockNotFound)
605         c.Check(n, Equals, int64(0))
606         c.Check(url2, Equals, "")
607         c.Check(r, Equals, nil)
608 }
609
610 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
611         uuid := "zzzzz-bi6l4-123451234512345"
612         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
613
614         // This one shouldn't be used:
615         ks0 := RunFakeKeepServer(StubGetHandler{
616                 c,
617                 "error if used",
618                 "abc123",
619                 http.StatusOK,
620                 []byte("foo")})
621         defer ks0.listener.Close()
622         // This one should be used:
623         ks := RunFakeKeepServer(StubGetHandler{
624                 c,
625                 hash + "+K@" + uuid,
626                 "abc123",
627                 http.StatusOK,
628                 []byte("foo")})
629         defer ks.listener.Close()
630
631         arv, err := arvadosclient.MakeArvadosClient()
632         kc, _ := MakeKeepClient(&arv)
633         arv.ApiToken = "abc123"
634         kc.SetServiceRoots(
635                 map[string]string{"x": ks0.url},
636                 nil,
637                 map[string]string{uuid: ks.url})
638
639         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
640         defer r.Close()
641         c.Check(err, Equals, nil)
642         c.Check(n, Equals, int64(3))
643         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
644
645         content, err := ioutil.ReadAll(r)
646         c.Check(err, Equals, nil)
647         c.Check(content, DeepEquals, []byte("foo"))
648 }
649
650 // Use a service hint to fetch from a local disk service, overriding
651 // rendezvous probe order.
652 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
653         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
654         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
655
656         // This one shouldn't be used, although it appears first in
657         // rendezvous probe order:
658         ks0 := RunFakeKeepServer(StubGetHandler{
659                 c,
660                 "error if used",
661                 "abc123",
662                 http.StatusOK,
663                 []byte("foo")})
664         defer ks0.listener.Close()
665         // This one should be used:
666         ks := RunFakeKeepServer(StubGetHandler{
667                 c,
668                 hash + "+K@" + uuid,
669                 "abc123",
670                 http.StatusOK,
671                 []byte("foo")})
672         defer ks.listener.Close()
673
674         arv, err := arvadosclient.MakeArvadosClient()
675         kc, _ := MakeKeepClient(&arv)
676         arv.ApiToken = "abc123"
677         kc.SetServiceRoots(
678                 map[string]string{
679                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
680                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
681                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
682                         uuid: ks.url},
683                 nil,
684                 map[string]string{
685                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
686                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
687                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
688                         uuid: ks.url},
689         )
690
691         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
692         defer r.Close()
693         c.Check(err, Equals, nil)
694         c.Check(n, Equals, int64(3))
695         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
696
697         content, err := ioutil.ReadAll(r)
698         c.Check(err, Equals, nil)
699         c.Check(content, DeepEquals, []byte("foo"))
700 }
701
702 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
703         uuid := "zzzzz-bi6l4-123451234512345"
704         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
705
706         ksLocal := RunFakeKeepServer(StubGetHandler{
707                 c,
708                 hash + "+K@" + uuid,
709                 "abc123",
710                 http.StatusOK,
711                 []byte("foo")})
712         defer ksLocal.listener.Close()
713         ksGateway := RunFakeKeepServer(StubGetHandler{
714                 c,
715                 hash + "+K@" + uuid,
716                 "abc123",
717                 http.StatusInternalServerError,
718                 []byte("Error")})
719         defer ksGateway.listener.Close()
720
721         arv, err := arvadosclient.MakeArvadosClient()
722         kc, _ := MakeKeepClient(&arv)
723         arv.ApiToken = "abc123"
724         kc.SetServiceRoots(
725                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
726                 nil,
727                 map[string]string{uuid: ksGateway.url})
728
729         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
730         c.Assert(err, Equals, nil)
731         defer r.Close()
732         c.Check(n, Equals, int64(3))
733         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
734
735         content, err := ioutil.ReadAll(r)
736         c.Check(err, Equals, nil)
737         c.Check(content, DeepEquals, []byte("foo"))
738 }
739
740 type BarHandler struct {
741         handled chan string
742 }
743
744 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
745         resp.Write([]byte("bar"))
746         this.handled <- fmt.Sprintf("http://%s", req.Host)
747 }
748
749 func (s *StandaloneSuite) TestChecksum(c *C) {
750         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
751         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
752
753         st := BarHandler{make(chan string, 1)}
754
755         ks := RunFakeKeepServer(st)
756         defer ks.listener.Close()
757
758         arv, err := arvadosclient.MakeArvadosClient()
759         kc, _ := MakeKeepClient(&arv)
760         arv.ApiToken = "abc123"
761         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
762
763         r, n, _, err := kc.Get(barhash)
764         _, err = ioutil.ReadAll(r)
765         c.Check(n, Equals, int64(3))
766         c.Check(err, Equals, nil)
767
768         <-st.handled
769
770         r, n, _, err = kc.Get(foohash)
771         _, err = ioutil.ReadAll(r)
772         c.Check(n, Equals, int64(3))
773         c.Check(err, Equals, BadChecksum)
774
775         <-st.handled
776 }
777
778 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
779         content := []byte("waz")
780         hash := fmt.Sprintf("%x", md5.Sum(content))
781
782         fh := Error404Handler{
783                 make(chan string, 4)}
784
785         st := StubGetHandler{
786                 c,
787                 hash,
788                 "abc123",
789                 http.StatusOK,
790                 content}
791
792         arv, err := arvadosclient.MakeArvadosClient()
793         kc, _ := MakeKeepClient(&arv)
794         arv.ApiToken = "abc123"
795         localRoots := make(map[string]string)
796         writableLocalRoots := make(map[string]string)
797
798         ks1 := RunSomeFakeKeepServers(st, 1)
799         ks2 := RunSomeFakeKeepServers(fh, 4)
800
801         for i, k := range ks1 {
802                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
803                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
804                 defer k.listener.Close()
805         }
806         for i, k := range ks2 {
807                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
808                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
809                 defer k.listener.Close()
810         }
811
812         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
813         kc.Retries = 0
814
815         // This test works only if one of the failing services is
816         // attempted before the succeeding service. Otherwise,
817         // <-fh.handled below will just hang! (Probe order depends on
818         // the choice of block content "waz" and the UUIDs of the fake
819         // servers, so we just tried different strings until we found
820         // an example that passes this Assert.)
821         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
822
823         r, n, url2, err := kc.Get(hash)
824
825         <-fh.handled
826         c.Check(err, Equals, nil)
827         c.Check(n, Equals, int64(3))
828         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
829
830         read_content, err2 := ioutil.ReadAll(r)
831         c.Check(err2, Equals, nil)
832         c.Check(read_content, DeepEquals, content)
833 }
834
835 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
836         content := []byte("TestPutGetHead")
837
838         arv, err := arvadosclient.MakeArvadosClient()
839         kc, err := MakeKeepClient(&arv)
840         c.Assert(err, Equals, nil)
841
842         hash := fmt.Sprintf("%x", md5.Sum(content))
843
844         {
845                 n, _, err := kc.Ask(hash)
846                 c.Check(err, Equals, BlockNotFound)
847                 c.Check(n, Equals, int64(0))
848         }
849         {
850                 hash2, replicas, err := kc.PutB(content)
851                 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
852                 c.Check(replicas, Equals, 2)
853                 c.Check(err, Equals, nil)
854         }
855         {
856                 r, n, url2, err := kc.Get(hash)
857                 c.Check(err, Equals, nil)
858                 c.Check(n, Equals, int64(len(content)))
859                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
860
861                 read_content, err2 := ioutil.ReadAll(r)
862                 c.Check(err2, Equals, nil)
863                 c.Check(read_content, DeepEquals, content)
864         }
865         {
866                 n, url2, err := kc.Ask(hash)
867                 c.Check(err, Equals, nil)
868                 c.Check(n, Equals, int64(len(content)))
869                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
870         }
871 }
872
873 type StubProxyHandler struct {
874         handled chan string
875 }
876
877 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
878         resp.Header().Set("X-Keep-Replicas-Stored", "2")
879         this.handled <- fmt.Sprintf("http://%s", req.Host)
880 }
881
882 func (s *StandaloneSuite) TestPutProxy(c *C) {
883         log.Printf("TestPutProxy")
884
885         st := StubProxyHandler{make(chan string, 1)}
886
887         arv, err := arvadosclient.MakeArvadosClient()
888         kc, _ := MakeKeepClient(&arv)
889
890         kc.Want_replicas = 2
891         kc.Using_proxy = true
892         arv.ApiToken = "abc123"
893         localRoots := make(map[string]string)
894         writableLocalRoots := make(map[string]string)
895
896         ks1 := RunSomeFakeKeepServers(st, 1)
897
898         for i, k := range ks1 {
899                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
900                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
901                 defer k.listener.Close()
902         }
903
904         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
905
906         _, replicas, err := kc.PutB([]byte("foo"))
907         <-st.handled
908
909         c.Check(err, Equals, nil)
910         c.Check(replicas, Equals, 2)
911
912         log.Printf("TestPutProxy done")
913 }
914
915 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
916         log.Printf("TestPutProxy")
917
918         st := StubProxyHandler{make(chan string, 1)}
919
920         arv, err := arvadosclient.MakeArvadosClient()
921         kc, _ := MakeKeepClient(&arv)
922
923         kc.Want_replicas = 3
924         kc.Using_proxy = true
925         arv.ApiToken = "abc123"
926         localRoots := make(map[string]string)
927         writableLocalRoots := make(map[string]string)
928
929         ks1 := RunSomeFakeKeepServers(st, 1)
930
931         for i, k := range ks1 {
932                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
933                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
934                 defer k.listener.Close()
935         }
936         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
937
938         _, replicas, err := kc.PutB([]byte("foo"))
939         <-st.handled
940
941         c.Check(err, Equals, InsufficientReplicasError)
942         c.Check(replicas, Equals, 2)
943
944         log.Printf("TestPutProxy done")
945 }
946
947 func (s *StandaloneSuite) TestMakeLocator(c *C) {
948         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
949         c.Check(err, Equals, nil)
950         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
951         c.Check(l.Size, Equals, 3)
952         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
953 }
954
955 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
956         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
957         c.Check(err, Equals, nil)
958         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
959         c.Check(l.Size, Equals, -1)
960         c.Check(l.Hints, DeepEquals, []string{})
961 }
962
963 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
964         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
965         c.Check(err, Equals, nil)
966         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
967         c.Check(l.Size, Equals, -1)
968         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
969 }
970
971 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
972         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
973         l, err := MakeLocator(str)
974         c.Check(err, Equals, nil)
975         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
976         c.Check(l.Size, Equals, 3)
977         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
978         c.Check(l.String(), Equals, str)
979 }
980
981 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
982         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
983         c.Check(err, Equals, InvalidLocatorError)
984 }
985
986 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
987         hash := Md5String("foo")
988
989         st := StubPutHandler{
990                 c,
991                 hash,
992                 "abc123",
993                 "foo",
994                 make(chan string, 5)}
995
996         arv, _ := arvadosclient.MakeArvadosClient()
997         kc, _ := MakeKeepClient(&arv)
998
999         kc.Want_replicas = 2
1000         arv.ApiToken = "abc123"
1001         localRoots := make(map[string]string)
1002         writableLocalRoots := make(map[string]string)
1003
1004         ks := RunSomeFakeKeepServers(st, 5)
1005
1006         for i, k := range ks {
1007                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1008                 if i == 0 {
1009                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1010                 }
1011                 defer k.listener.Close()
1012         }
1013
1014         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1015
1016         _, replicas, err := kc.PutB([]byte("foo"))
1017
1018         c.Check(err, Equals, InsufficientReplicasError)
1019         c.Check(replicas, Equals, 1)
1020
1021         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1022 }
1023
1024 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1025         hash := Md5String("foo")
1026
1027         st := StubPutHandler{
1028                 c,
1029                 hash,
1030                 "abc123",
1031                 "foo",
1032                 make(chan string, 5)}
1033
1034         arv, _ := arvadosclient.MakeArvadosClient()
1035         kc, _ := MakeKeepClient(&arv)
1036
1037         kc.Want_replicas = 2
1038         arv.ApiToken = "abc123"
1039         localRoots := make(map[string]string)
1040         writableLocalRoots := make(map[string]string)
1041
1042         ks := RunSomeFakeKeepServers(st, 5)
1043
1044         for i, k := range ks {
1045                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1046                 defer k.listener.Close()
1047         }
1048
1049         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1050
1051         _, replicas, err := kc.PutB([]byte("foo"))
1052
1053         c.Check(err, Equals, InsufficientReplicasError)
1054         c.Check(replicas, Equals, 0)
1055 }
1056
1057 type StubGetIndexHandler struct {
1058         c              *C
1059         expectPath     string
1060         expectAPIToken string
1061         httpStatus     int
1062         body           []byte
1063 }
1064
1065 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1066         h.c.Check(req.URL.Path, Equals, h.expectPath)
1067         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1068         resp.WriteHeader(h.httpStatus)
1069         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1070         resp.Write(h.body)
1071 }
1072
1073 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1074         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1075
1076         st := StubGetIndexHandler{
1077                 c,
1078                 "/index",
1079                 "abc123",
1080                 http.StatusOK,
1081                 []byte(hash + "+3 1443559274\n\n")}
1082
1083         ks := RunFakeKeepServer(st)
1084         defer ks.listener.Close()
1085
1086         arv, err := arvadosclient.MakeArvadosClient()
1087         kc, _ := MakeKeepClient(&arv)
1088         arv.ApiToken = "abc123"
1089         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1090
1091         r, err := kc.GetIndex("x", "")
1092         c.Check(err, Equals, nil)
1093
1094         content, err2 := ioutil.ReadAll(r)
1095         c.Check(err2, Equals, nil)
1096         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1097 }
1098
1099 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1100         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1101
1102         st := StubGetIndexHandler{
1103                 c,
1104                 "/index/" + hash[0:3],
1105                 "abc123",
1106                 http.StatusOK,
1107                 []byte(hash + "+3 1443559274\n\n")}
1108
1109         ks := RunFakeKeepServer(st)
1110         defer ks.listener.Close()
1111
1112         arv, err := arvadosclient.MakeArvadosClient()
1113         kc, _ := MakeKeepClient(&arv)
1114         arv.ApiToken = "abc123"
1115         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1116
1117         r, err := kc.GetIndex("x", hash[0:3])
1118         c.Check(err, Equals, nil)
1119
1120         content, err2 := ioutil.ReadAll(r)
1121         c.Check(err2, Equals, nil)
1122         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1123 }
1124
1125 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1126         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1127
1128         st := StubGetIndexHandler{
1129                 c,
1130                 "/index/" + hash[0:3],
1131                 "abc123",
1132                 http.StatusOK,
1133                 []byte(hash)}
1134
1135         ks := RunFakeKeepServer(st)
1136         defer ks.listener.Close()
1137
1138         arv, err := arvadosclient.MakeArvadosClient()
1139         kc, _ := MakeKeepClient(&arv)
1140         arv.ApiToken = "abc123"
1141         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1142
1143         _, err = kc.GetIndex("x", hash[0:3])
1144         c.Check(err, Equals, ErrIncompleteIndex)
1145 }
1146
1147 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1148         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1149
1150         st := StubGetIndexHandler{
1151                 c,
1152                 "/index/" + hash[0:3],
1153                 "abc123",
1154                 http.StatusOK,
1155                 []byte(hash)}
1156
1157         ks := RunFakeKeepServer(st)
1158         defer ks.listener.Close()
1159
1160         arv, err := arvadosclient.MakeArvadosClient()
1161         kc, _ := MakeKeepClient(&arv)
1162         arv.ApiToken = "abc123"
1163         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1164
1165         _, err = kc.GetIndex("y", hash[0:3])
1166         c.Check(err, Equals, ErrNoSuchKeepServer)
1167 }
1168
1169 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1170         st := StubGetIndexHandler{
1171                 c,
1172                 "/index/abcd",
1173                 "abc123",
1174                 http.StatusOK,
1175                 []byte("\n")}
1176
1177         ks := RunFakeKeepServer(st)
1178         defer ks.listener.Close()
1179
1180         arv, err := arvadosclient.MakeArvadosClient()
1181         kc, _ := MakeKeepClient(&arv)
1182         arv.ApiToken = "abc123"
1183         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1184
1185         r, err := kc.GetIndex("x", "abcd")
1186         c.Check(err, Equals, nil)
1187
1188         content, err2 := ioutil.ReadAll(r)
1189         c.Check(err2, Equals, nil)
1190         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1191 }
1192
1193 type FailThenSucceedPutHandler struct {
1194         handled        chan string
1195         count          int
1196         successhandler StubPutHandler
1197 }
1198
1199 func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1200         if h.count == 0 {
1201                 resp.WriteHeader(500)
1202                 h.count += 1
1203                 h.handled <- fmt.Sprintf("http://%s", req.Host)
1204         } else {
1205                 h.successhandler.ServeHTTP(resp, req)
1206         }
1207 }
1208
1209 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1210         st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
1211                 StubPutHandler{
1212                         c,
1213                         Md5String("foo"),
1214                         "abc123",
1215                         "foo",
1216                         make(chan string, 5)}}
1217
1218         arv, _ := arvadosclient.MakeArvadosClient()
1219         kc, _ := MakeKeepClient(&arv)
1220
1221         kc.Want_replicas = 2
1222         arv.ApiToken = "abc123"
1223         localRoots := make(map[string]string)
1224         writableLocalRoots := make(map[string]string)
1225
1226         ks := RunSomeFakeKeepServers(st, 2)
1227
1228         for i, k := range ks {
1229                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1230                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1231                 defer k.listener.Close()
1232         }
1233
1234         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1235
1236         hash, replicas, err := kc.PutB([]byte("foo"))
1237
1238         c.Check(err, Equals, nil)
1239         c.Check(hash, Equals, "")
1240         c.Check(replicas, Equals, 2)
1241 }