Merge branch 'master' into 7492-keepproxy-upstream-errors
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "crypto/md5"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9         "git.curoverse.com/arvados.git/sdk/go/streamer"
10         . "gopkg.in/check.v1"
11         "io"
12         "io/ioutil"
13         "log"
14         "net"
15         "net/http"
16         "os"
17         "strings"
18         "testing"
19 )
20
21 // Gocheck boilerplate
22 func Test(t *testing.T) {
23         TestingT(t)
24 }
25
26 // Gocheck boilerplate
27 var _ = Suite(&ServerRequiredSuite{})
28 var _ = Suite(&StandaloneSuite{})
29
30 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
31
32 // Tests that require the Keep server running
33 type ServerRequiredSuite struct{}
34
35 // Standalone tests
36 type StandaloneSuite struct{}
37
38 func pythonDir() string {
39         cwd, _ := os.Getwd()
40         return fmt.Sprintf("%s/../../python/tests", cwd)
41 }
42
43 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
44         if *no_server {
45                 c.Skip("Skipping tests that require server")
46                 return
47         }
48         arvadostest.StartAPI()
49         arvadostest.StartKeep(2, false)
50 }
51
52 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
53         if *no_server {
54                 return
55         }
56         arvadostest.StopKeep(2)
57         arvadostest.StopAPI()
58 }
59
60 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
61         arv, err := arvadosclient.MakeArvadosClient()
62         c.Assert(err, Equals, nil)
63
64         kc, err := MakeKeepClient(&arv)
65
66         c.Assert(err, Equals, nil)
67         c.Check(len(kc.LocalRoots()), Equals, 2)
68         for _, root := range kc.LocalRoots() {
69                 c.Check(root, Matches, "http://localhost:\\d+")
70         }
71 }
72
73 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
74         arv, err := arvadosclient.MakeArvadosClient()
75         c.Assert(err, Equals, nil)
76
77         kc, err := MakeKeepClient(&arv)
78         c.Assert(kc.Want_replicas, Equals, 2)
79
80         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
81         kc, err = MakeKeepClient(&arv)
82         c.Assert(kc.Want_replicas, Equals, 3)
83
84         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
85         kc, err = MakeKeepClient(&arv)
86         c.Assert(kc.Want_replicas, Equals, 1)
87 }
88
89 type StubPutHandler struct {
90         c              *C
91         expectPath     string
92         expectApiToken string
93         expectBody     string
94         handled        chan string
95 }
96
97 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
98         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
99         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
100         body, err := ioutil.ReadAll(req.Body)
101         sph.c.Check(err, Equals, nil)
102         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
103         resp.WriteHeader(200)
104         sph.handled <- fmt.Sprintf("http://%s", req.Host)
105 }
106
107 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
108         var err error
109         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
110         if err != nil {
111                 panic(fmt.Sprintf("Could not listen on any port"))
112         }
113         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
114         go http.Serve(ks.listener, st)
115         return
116 }
117
118 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
119         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
120
121         ks := RunFakeKeepServer(st)
122         defer ks.listener.Close()
123
124         arv, _ := arvadosclient.MakeArvadosClient()
125         arv.ApiToken = "abc123"
126
127         kc, _ := MakeKeepClient(&arv)
128
129         reader, writer := io.Pipe()
130         upload_status := make(chan uploadStatus)
131
132         f(kc, ks.url, reader, writer, upload_status)
133 }
134
135 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
136         log.Printf("TestUploadToStubKeepServer")
137
138         st := StubPutHandler{
139                 c,
140                 "acbd18db4cc2f85cedef654fccc4a4d8",
141                 "abc123",
142                 "foo",
143                 make(chan string)}
144
145         UploadToStubHelper(c, st,
146                 func(kc *KeepClient, url string, reader io.ReadCloser,
147                         writer io.WriteCloser, upload_status chan uploadStatus) {
148
149                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), "TestUploadToStubKeepServer")
150
151                         writer.Write([]byte("foo"))
152                         writer.Close()
153
154                         <-st.handled
155                         status := <-upload_status
156                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
157                 })
158
159         log.Printf("TestUploadToStubKeepServer done")
160 }
161
162 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
163         log.Printf("TestUploadToStubKeepServerBufferReader")
164
165         st := StubPutHandler{
166                 c,
167                 "acbd18db4cc2f85cedef654fccc4a4d8",
168                 "abc123",
169                 "foo",
170                 make(chan string)}
171
172         UploadToStubHelper(c, st,
173                 func(kc *KeepClient, url string, reader io.ReadCloser,
174                         writer io.WriteCloser, upload_status chan uploadStatus) {
175
176                         tr := streamer.AsyncStreamFromReader(512, reader)
177                         defer tr.Close()
178
179                         br1 := tr.MakeStreamReader()
180
181                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, "TestUploadToStubKeepServerBufferReader")
182
183                         writer.Write([]byte("foo"))
184                         writer.Close()
185
186                         <-st.handled
187
188                         status := <-upload_status
189                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
190                 })
191
192         log.Printf("TestUploadToStubKeepServerBufferReader done")
193 }
194
195 type FailHandler struct {
196         handled chan string
197 }
198
199 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
200         resp.WriteHeader(500)
201         fh.handled <- fmt.Sprintf("http://%s", req.Host)
202 }
203
204 type FailThenSucceedHandler struct {
205         handled        chan string
206         count          int
207         successhandler StubGetHandler
208 }
209
210 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
211         if fh.count == 0 {
212                 resp.WriteHeader(500)
213                 fh.count += 1
214                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
215         } else {
216                 fh.successhandler.ServeHTTP(resp, req)
217         }
218 }
219
220 type Error404Handler struct {
221         handled chan string
222 }
223
224 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
225         resp.WriteHeader(404)
226         fh.handled <- fmt.Sprintf("http://%s", req.Host)
227 }
228
229 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
230         log.Printf("TestFailedUploadToStubKeepServer")
231
232         st := FailHandler{
233                 make(chan string)}
234
235         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
236
237         UploadToStubHelper(c, st,
238                 func(kc *KeepClient, url string, reader io.ReadCloser,
239                         writer io.WriteCloser, upload_status chan uploadStatus) {
240
241                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, "TestFailedUploadToStubKeepServer")
242
243                         writer.Write([]byte("foo"))
244                         writer.Close()
245
246                         <-st.handled
247
248                         status := <-upload_status
249                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
250                         c.Check(status.statusCode, Equals, 500)
251                 })
252         log.Printf("TestFailedUploadToStubKeepServer done")
253 }
254
255 type KeepServer struct {
256         listener net.Listener
257         url      string
258 }
259
260 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
261         ks = make([]KeepServer, n)
262
263         for i := 0; i < n; i += 1 {
264                 ks[i] = RunFakeKeepServer(st)
265         }
266
267         return ks
268 }
269
270 func (s *StandaloneSuite) TestPutB(c *C) {
271         log.Printf("TestPutB")
272
273         hash := Md5String("foo")
274
275         st := StubPutHandler{
276                 c,
277                 hash,
278                 "abc123",
279                 "foo",
280                 make(chan string, 5)}
281
282         arv, _ := arvadosclient.MakeArvadosClient()
283         kc, _ := MakeKeepClient(&arv)
284
285         kc.Want_replicas = 2
286         arv.ApiToken = "abc123"
287         localRoots := make(map[string]string)
288         writableLocalRoots := make(map[string]string)
289
290         ks := RunSomeFakeKeepServers(st, 5)
291
292         for i, k := range ks {
293                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
294                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
295                 defer k.listener.Close()
296         }
297
298         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
299
300         kc.PutB([]byte("foo"))
301
302         shuff := NewRootSorter(
303                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
304
305         s1 := <-st.handled
306         s2 := <-st.handled
307         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
308                 (s1 == shuff[1] && s2 == shuff[0]),
309                 Equals,
310                 true)
311
312         log.Printf("TestPutB done")
313 }
314
315 func (s *StandaloneSuite) TestPutHR(c *C) {
316         log.Printf("TestPutHR")
317
318         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
319
320         st := StubPutHandler{
321                 c,
322                 hash,
323                 "abc123",
324                 "foo",
325                 make(chan string, 5)}
326
327         arv, _ := arvadosclient.MakeArvadosClient()
328         kc, _ := MakeKeepClient(&arv)
329
330         kc.Want_replicas = 2
331         arv.ApiToken = "abc123"
332         localRoots := make(map[string]string)
333         writableLocalRoots := make(map[string]string)
334
335         ks := RunSomeFakeKeepServers(st, 5)
336
337         for i, k := range ks {
338                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
339                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
340                 defer k.listener.Close()
341         }
342
343         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
344
345         reader, writer := io.Pipe()
346
347         go func() {
348                 writer.Write([]byte("foo"))
349                 writer.Close()
350         }()
351
352         kc.PutHR(hash, reader, 3)
353
354         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
355         log.Print(shuff)
356
357         s1 := <-st.handled
358         s2 := <-st.handled
359
360         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
361                 (s1 == shuff[1] && s2 == shuff[0]),
362                 Equals,
363                 true)
364
365         log.Printf("TestPutHR done")
366 }
367
368 func (s *StandaloneSuite) TestPutWithFail(c *C) {
369         log.Printf("TestPutWithFail")
370
371         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
372
373         st := StubPutHandler{
374                 c,
375                 hash,
376                 "abc123",
377                 "foo",
378                 make(chan string, 4)}
379
380         fh := FailHandler{
381                 make(chan string, 1)}
382
383         arv, err := arvadosclient.MakeArvadosClient()
384         kc, _ := MakeKeepClient(&arv)
385
386         kc.Want_replicas = 2
387         arv.ApiToken = "abc123"
388         localRoots := make(map[string]string)
389         writableLocalRoots := make(map[string]string)
390
391         ks1 := RunSomeFakeKeepServers(st, 4)
392         ks2 := RunSomeFakeKeepServers(fh, 1)
393
394         for i, k := range ks1 {
395                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
396                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
397                 defer k.listener.Close()
398         }
399         for i, k := range ks2 {
400                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
401                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
402                 defer k.listener.Close()
403         }
404
405         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
406
407         shuff := NewRootSorter(
408                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
409
410         phash, replicas, err := kc.PutB([]byte("foo"))
411
412         <-fh.handled
413
414         c.Check(err, Equals, nil)
415         c.Check(phash, Equals, "")
416         c.Check(replicas, Equals, 2)
417
418         s1 := <-st.handled
419         s2 := <-st.handled
420
421         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
422                 (s1 == shuff[2] && s2 == shuff[1]),
423                 Equals,
424                 true)
425 }
426
427 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
428         log.Printf("TestPutWithTooManyFail")
429
430         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
431
432         st := StubPutHandler{
433                 c,
434                 hash,
435                 "abc123",
436                 "foo",
437                 make(chan string, 1)}
438
439         fh := FailHandler{
440                 make(chan string, 4)}
441
442         arv, err := arvadosclient.MakeArvadosClient()
443         kc, _ := MakeKeepClient(&arv)
444
445         kc.Want_replicas = 2
446         kc.Retries = 0
447         arv.ApiToken = "abc123"
448         localRoots := make(map[string]string)
449         writableLocalRoots := make(map[string]string)
450
451         ks1 := RunSomeFakeKeepServers(st, 1)
452         ks2 := RunSomeFakeKeepServers(fh, 4)
453
454         for i, k := range ks1 {
455                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
456                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
457                 defer k.listener.Close()
458         }
459         for i, k := range ks2 {
460                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
461                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
462                 defer k.listener.Close()
463         }
464
465         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
466
467         _, replicas, err := kc.PutB([]byte("foo"))
468
469         c.Check(err, Equals, InsufficientReplicasError)
470         c.Check(replicas, Equals, 1)
471         c.Check(<-st.handled, Equals, ks1[0].url)
472
473         log.Printf("TestPutWithTooManyFail done")
474 }
475
476 type StubGetHandler struct {
477         c              *C
478         expectPath     string
479         expectApiToken string
480         httpStatus     int
481         body           []byte
482 }
483
484 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
485         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
486         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
487         resp.WriteHeader(sgh.httpStatus)
488         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
489         resp.Write(sgh.body)
490 }
491
492 func (s *StandaloneSuite) TestGet(c *C) {
493         log.Printf("TestGet")
494
495         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
496
497         st := StubGetHandler{
498                 c,
499                 hash,
500                 "abc123",
501                 http.StatusOK,
502                 []byte("foo")}
503
504         ks := RunFakeKeepServer(st)
505         defer ks.listener.Close()
506
507         arv, err := arvadosclient.MakeArvadosClient()
508         kc, _ := MakeKeepClient(&arv)
509         arv.ApiToken = "abc123"
510         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
511
512         r, n, url2, err := kc.Get(hash)
513         defer r.Close()
514         c.Check(err, Equals, nil)
515         c.Check(n, Equals, int64(3))
516         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
517
518         content, err2 := ioutil.ReadAll(r)
519         c.Check(err2, Equals, nil)
520         c.Check(content, DeepEquals, []byte("foo"))
521
522         log.Printf("TestGet done")
523 }
524
525 func (s *StandaloneSuite) TestGet404(c *C) {
526         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
527
528         st := Error404Handler{make(chan string, 1)}
529
530         ks := RunFakeKeepServer(st)
531         defer ks.listener.Close()
532
533         arv, err := arvadosclient.MakeArvadosClient()
534         kc, _ := MakeKeepClient(&arv)
535         arv.ApiToken = "abc123"
536         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
537
538         r, n, url2, err := kc.Get(hash)
539         c.Check(err, Equals, BlockNotFound)
540         c.Check(n, Equals, int64(0))
541         c.Check(url2, Equals, "")
542         c.Check(r, Equals, nil)
543 }
544
545 func (s *StandaloneSuite) TestGetFail(c *C) {
546         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
547
548         st := FailHandler{make(chan string, 1)}
549
550         ks := RunFakeKeepServer(st)
551         defer ks.listener.Close()
552
553         arv, err := arvadosclient.MakeArvadosClient()
554         kc, _ := MakeKeepClient(&arv)
555         arv.ApiToken = "abc123"
556         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
557         kc.Retries = 0
558
559         r, n, url2, err := kc.Get(hash)
560         errNotFound, _ := err.(ErrNotFound)
561         c.Check(errNotFound, NotNil)
562         c.Check(strings.Contains(err.Error(), "use of closed network connection"), Equals, true)
563         c.Check(n, Equals, int64(0))
564         c.Check(url2, Equals, "")
565         c.Check(r, Equals, nil)
566 }
567
568 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
569         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
570
571         st := &FailThenSucceedHandler{make(chan string, 1), 0,
572                 StubGetHandler{
573                         c,
574                         hash,
575                         "abc123",
576                         http.StatusOK,
577                         []byte("foo")}}
578
579         ks := RunFakeKeepServer(st)
580         defer ks.listener.Close()
581
582         arv, err := arvadosclient.MakeArvadosClient()
583         kc, _ := MakeKeepClient(&arv)
584         arv.ApiToken = "abc123"
585         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
586
587         r, n, url2, err := kc.Get(hash)
588         defer r.Close()
589         c.Check(err, Equals, nil)
590         c.Check(n, Equals, int64(3))
591         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
592
593         content, err2 := ioutil.ReadAll(r)
594         c.Check(err2, Equals, nil)
595         c.Check(content, DeepEquals, []byte("foo"))
596 }
597
598 func (s *StandaloneSuite) TestGetNetError(c *C) {
599         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
600
601         arv, err := arvadosclient.MakeArvadosClient()
602         kc, _ := MakeKeepClient(&arv)
603         arv.ApiToken = "abc123"
604         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
605
606         r, n, url2, err := kc.Get(hash)
607         errNotFound, _ := err.(ErrNotFound)
608         c.Check(errNotFound, NotNil)
609         c.Check(strings.Contains(err.Error(), "connection refused"), Equals, true)
610         c.Check(n, Equals, int64(0))
611         c.Check(url2, Equals, "")
612         c.Check(r, Equals, nil)
613 }
614
615 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
616         uuid := "zzzzz-bi6l4-123451234512345"
617         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
618
619         // This one shouldn't be used:
620         ks0 := RunFakeKeepServer(StubGetHandler{
621                 c,
622                 "error if used",
623                 "abc123",
624                 http.StatusOK,
625                 []byte("foo")})
626         defer ks0.listener.Close()
627         // This one should be used:
628         ks := RunFakeKeepServer(StubGetHandler{
629                 c,
630                 hash + "+K@" + uuid,
631                 "abc123",
632                 http.StatusOK,
633                 []byte("foo")})
634         defer ks.listener.Close()
635
636         arv, err := arvadosclient.MakeArvadosClient()
637         kc, _ := MakeKeepClient(&arv)
638         arv.ApiToken = "abc123"
639         kc.SetServiceRoots(
640                 map[string]string{"x": ks0.url},
641                 nil,
642                 map[string]string{uuid: ks.url})
643
644         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
645         defer r.Close()
646         c.Check(err, Equals, nil)
647         c.Check(n, Equals, int64(3))
648         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
649
650         content, err := ioutil.ReadAll(r)
651         c.Check(err, Equals, nil)
652         c.Check(content, DeepEquals, []byte("foo"))
653 }
654
655 // Use a service hint to fetch from a local disk service, overriding
656 // rendezvous probe order.
657 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
658         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
659         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
660
661         // This one shouldn't be used, although it appears first in
662         // rendezvous probe order:
663         ks0 := RunFakeKeepServer(StubGetHandler{
664                 c,
665                 "error if used",
666                 "abc123",
667                 http.StatusOK,
668                 []byte("foo")})
669         defer ks0.listener.Close()
670         // This one should be used:
671         ks := RunFakeKeepServer(StubGetHandler{
672                 c,
673                 hash + "+K@" + uuid,
674                 "abc123",
675                 http.StatusOK,
676                 []byte("foo")})
677         defer ks.listener.Close()
678
679         arv, err := arvadosclient.MakeArvadosClient()
680         kc, _ := MakeKeepClient(&arv)
681         arv.ApiToken = "abc123"
682         kc.SetServiceRoots(
683                 map[string]string{
684                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
685                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
686                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
687                         uuid: ks.url},
688                 nil,
689                 map[string]string{
690                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
691                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
692                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
693                         uuid: ks.url},
694         )
695
696         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
697         defer r.Close()
698         c.Check(err, Equals, nil)
699         c.Check(n, Equals, int64(3))
700         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
701
702         content, err := ioutil.ReadAll(r)
703         c.Check(err, Equals, nil)
704         c.Check(content, DeepEquals, []byte("foo"))
705 }
706
707 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
708         uuid := "zzzzz-bi6l4-123451234512345"
709         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
710
711         ksLocal := RunFakeKeepServer(StubGetHandler{
712                 c,
713                 hash + "+K@" + uuid,
714                 "abc123",
715                 http.StatusOK,
716                 []byte("foo")})
717         defer ksLocal.listener.Close()
718         ksGateway := RunFakeKeepServer(StubGetHandler{
719                 c,
720                 hash + "+K@" + uuid,
721                 "abc123",
722                 http.StatusInternalServerError,
723                 []byte("Error")})
724         defer ksGateway.listener.Close()
725
726         arv, err := arvadosclient.MakeArvadosClient()
727         kc, _ := MakeKeepClient(&arv)
728         arv.ApiToken = "abc123"
729         kc.SetServiceRoots(
730                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
731                 nil,
732                 map[string]string{uuid: ksGateway.url})
733
734         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
735         c.Assert(err, Equals, nil)
736         defer r.Close()
737         c.Check(n, Equals, int64(3))
738         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
739
740         content, err := ioutil.ReadAll(r)
741         c.Check(err, Equals, nil)
742         c.Check(content, DeepEquals, []byte("foo"))
743 }
744
745 type BarHandler struct {
746         handled chan string
747 }
748
749 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
750         resp.Write([]byte("bar"))
751         this.handled <- fmt.Sprintf("http://%s", req.Host)
752 }
753
754 func (s *StandaloneSuite) TestChecksum(c *C) {
755         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
756         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
757
758         st := BarHandler{make(chan string, 1)}
759
760         ks := RunFakeKeepServer(st)
761         defer ks.listener.Close()
762
763         arv, err := arvadosclient.MakeArvadosClient()
764         kc, _ := MakeKeepClient(&arv)
765         arv.ApiToken = "abc123"
766         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
767
768         r, n, _, err := kc.Get(barhash)
769         _, err = ioutil.ReadAll(r)
770         c.Check(n, Equals, int64(3))
771         c.Check(err, Equals, nil)
772
773         <-st.handled
774
775         r, n, _, err = kc.Get(foohash)
776         _, err = ioutil.ReadAll(r)
777         c.Check(n, Equals, int64(3))
778         c.Check(err, Equals, BadChecksum)
779
780         <-st.handled
781 }
782
783 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
784         content := []byte("waz")
785         hash := fmt.Sprintf("%x", md5.Sum(content))
786
787         fh := Error404Handler{
788                 make(chan string, 4)}
789
790         st := StubGetHandler{
791                 c,
792                 hash,
793                 "abc123",
794                 http.StatusOK,
795                 content}
796
797         arv, err := arvadosclient.MakeArvadosClient()
798         kc, _ := MakeKeepClient(&arv)
799         arv.ApiToken = "abc123"
800         localRoots := make(map[string]string)
801         writableLocalRoots := make(map[string]string)
802
803         ks1 := RunSomeFakeKeepServers(st, 1)
804         ks2 := RunSomeFakeKeepServers(fh, 4)
805
806         for i, k := range ks1 {
807                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
808                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
809                 defer k.listener.Close()
810         }
811         for i, k := range ks2 {
812                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
813                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
814                 defer k.listener.Close()
815         }
816
817         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
818         kc.Retries = 0
819
820         // This test works only if one of the failing services is
821         // attempted before the succeeding service. Otherwise,
822         // <-fh.handled below will just hang! (Probe order depends on
823         // the choice of block content "waz" and the UUIDs of the fake
824         // servers, so we just tried different strings until we found
825         // an example that passes this Assert.)
826         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
827
828         r, n, url2, err := kc.Get(hash)
829
830         <-fh.handled
831         c.Check(err, Equals, nil)
832         c.Check(n, Equals, int64(3))
833         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
834
835         read_content, err2 := ioutil.ReadAll(r)
836         c.Check(err2, Equals, nil)
837         c.Check(read_content, DeepEquals, content)
838 }
839
840 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
841         content := []byte("TestPutGetHead")
842
843         arv, err := arvadosclient.MakeArvadosClient()
844         kc, err := MakeKeepClient(&arv)
845         c.Assert(err, Equals, nil)
846
847         hash := fmt.Sprintf("%x", md5.Sum(content))
848
849         {
850                 n, _, err := kc.Ask(hash)
851                 c.Check(err, Equals, BlockNotFound)
852                 c.Check(n, Equals, int64(0))
853         }
854         {
855                 hash2, replicas, err := kc.PutB(content)
856                 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
857                 c.Check(replicas, Equals, 2)
858                 c.Check(err, Equals, nil)
859         }
860         {
861                 r, n, url2, err := kc.Get(hash)
862                 c.Check(err, Equals, nil)
863                 c.Check(n, Equals, int64(len(content)))
864                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
865
866                 read_content, err2 := ioutil.ReadAll(r)
867                 c.Check(err2, Equals, nil)
868                 c.Check(read_content, DeepEquals, content)
869         }
870         {
871                 n, url2, err := kc.Ask(hash)
872                 c.Check(err, Equals, nil)
873                 c.Check(n, Equals, int64(len(content)))
874                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
875         }
876 }
877
878 type StubProxyHandler struct {
879         handled chan string
880 }
881
882 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
883         resp.Header().Set("X-Keep-Replicas-Stored", "2")
884         this.handled <- fmt.Sprintf("http://%s", req.Host)
885 }
886
887 func (s *StandaloneSuite) TestPutProxy(c *C) {
888         log.Printf("TestPutProxy")
889
890         st := StubProxyHandler{make(chan string, 1)}
891
892         arv, err := arvadosclient.MakeArvadosClient()
893         kc, _ := MakeKeepClient(&arv)
894
895         kc.Want_replicas = 2
896         kc.Using_proxy = true
897         arv.ApiToken = "abc123"
898         localRoots := make(map[string]string)
899         writableLocalRoots := make(map[string]string)
900
901         ks1 := RunSomeFakeKeepServers(st, 1)
902
903         for i, k := range ks1 {
904                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
905                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
906                 defer k.listener.Close()
907         }
908
909         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
910
911         _, replicas, err := kc.PutB([]byte("foo"))
912         <-st.handled
913
914         c.Check(err, Equals, nil)
915         c.Check(replicas, Equals, 2)
916
917         log.Printf("TestPutProxy done")
918 }
919
920 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
921         log.Printf("TestPutProxy")
922
923         st := StubProxyHandler{make(chan string, 1)}
924
925         arv, err := arvadosclient.MakeArvadosClient()
926         kc, _ := MakeKeepClient(&arv)
927
928         kc.Want_replicas = 3
929         kc.Using_proxy = true
930         arv.ApiToken = "abc123"
931         localRoots := make(map[string]string)
932         writableLocalRoots := make(map[string]string)
933
934         ks1 := RunSomeFakeKeepServers(st, 1)
935
936         for i, k := range ks1 {
937                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
938                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
939                 defer k.listener.Close()
940         }
941         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
942
943         _, replicas, err := kc.PutB([]byte("foo"))
944         <-st.handled
945
946         c.Check(err, Equals, InsufficientReplicasError)
947         c.Check(replicas, Equals, 2)
948
949         log.Printf("TestPutProxy done")
950 }
951
952 func (s *StandaloneSuite) TestMakeLocator(c *C) {
953         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
954         c.Check(err, Equals, nil)
955         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
956         c.Check(l.Size, Equals, 3)
957         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
958 }
959
960 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
961         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
962         c.Check(err, Equals, nil)
963         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
964         c.Check(l.Size, Equals, -1)
965         c.Check(l.Hints, DeepEquals, []string{})
966 }
967
968 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
969         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
970         c.Check(err, Equals, nil)
971         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
972         c.Check(l.Size, Equals, -1)
973         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
974 }
975
976 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
977         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
978         l, err := MakeLocator(str)
979         c.Check(err, Equals, nil)
980         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
981         c.Check(l.Size, Equals, 3)
982         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
983         c.Check(l.String(), Equals, str)
984 }
985
986 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
987         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
988         c.Check(err, Equals, InvalidLocatorError)
989 }
990
991 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
992         hash := Md5String("foo")
993
994         st := StubPutHandler{
995                 c,
996                 hash,
997                 "abc123",
998                 "foo",
999                 make(chan string, 5)}
1000
1001         arv, _ := arvadosclient.MakeArvadosClient()
1002         kc, _ := MakeKeepClient(&arv)
1003
1004         kc.Want_replicas = 2
1005         arv.ApiToken = "abc123"
1006         localRoots := make(map[string]string)
1007         writableLocalRoots := make(map[string]string)
1008
1009         ks := RunSomeFakeKeepServers(st, 5)
1010
1011         for i, k := range ks {
1012                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1013                 if i == 0 {
1014                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1015                 }
1016                 defer k.listener.Close()
1017         }
1018
1019         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1020
1021         _, replicas, err := kc.PutB([]byte("foo"))
1022
1023         c.Check(err, Equals, InsufficientReplicasError)
1024         c.Check(replicas, Equals, 1)
1025
1026         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1027 }
1028
1029 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1030         hash := Md5String("foo")
1031
1032         st := StubPutHandler{
1033                 c,
1034                 hash,
1035                 "abc123",
1036                 "foo",
1037                 make(chan string, 5)}
1038
1039         arv, _ := arvadosclient.MakeArvadosClient()
1040         kc, _ := MakeKeepClient(&arv)
1041
1042         kc.Want_replicas = 2
1043         arv.ApiToken = "abc123"
1044         localRoots := make(map[string]string)
1045         writableLocalRoots := make(map[string]string)
1046
1047         ks := RunSomeFakeKeepServers(st, 5)
1048
1049         for i, k := range ks {
1050                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1051                 defer k.listener.Close()
1052         }
1053
1054         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1055
1056         _, replicas, err := kc.PutB([]byte("foo"))
1057
1058         c.Check(err, Equals, InsufficientReplicasError)
1059         c.Check(replicas, Equals, 0)
1060 }
1061
1062 type StubGetIndexHandler struct {
1063         c              *C
1064         expectPath     string
1065         expectAPIToken string
1066         httpStatus     int
1067         body           []byte
1068 }
1069
1070 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1071         h.c.Check(req.URL.Path, Equals, h.expectPath)
1072         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1073         resp.WriteHeader(h.httpStatus)
1074         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1075         resp.Write(h.body)
1076 }
1077
1078 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1079         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1080
1081         st := StubGetIndexHandler{
1082                 c,
1083                 "/index",
1084                 "abc123",
1085                 http.StatusOK,
1086                 []byte(hash + "+3 1443559274\n\n")}
1087
1088         ks := RunFakeKeepServer(st)
1089         defer ks.listener.Close()
1090
1091         arv, err := arvadosclient.MakeArvadosClient()
1092         kc, _ := MakeKeepClient(&arv)
1093         arv.ApiToken = "abc123"
1094         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1095
1096         r, err := kc.GetIndex("x", "")
1097         c.Check(err, Equals, nil)
1098
1099         content, err2 := ioutil.ReadAll(r)
1100         c.Check(err2, Equals, nil)
1101         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1102 }
1103
1104 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1105         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1106
1107         st := StubGetIndexHandler{
1108                 c,
1109                 "/index/" + hash[0:3],
1110                 "abc123",
1111                 http.StatusOK,
1112                 []byte(hash + "+3 1443559274\n\n")}
1113
1114         ks := RunFakeKeepServer(st)
1115         defer ks.listener.Close()
1116
1117         arv, err := arvadosclient.MakeArvadosClient()
1118         kc, _ := MakeKeepClient(&arv)
1119         arv.ApiToken = "abc123"
1120         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1121
1122         r, err := kc.GetIndex("x", hash[0:3])
1123         c.Check(err, Equals, nil)
1124
1125         content, err2 := ioutil.ReadAll(r)
1126         c.Check(err2, Equals, nil)
1127         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1128 }
1129
1130 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1131         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1132
1133         st := StubGetIndexHandler{
1134                 c,
1135                 "/index/" + hash[0:3],
1136                 "abc123",
1137                 http.StatusOK,
1138                 []byte(hash)}
1139
1140         ks := RunFakeKeepServer(st)
1141         defer ks.listener.Close()
1142
1143         arv, err := arvadosclient.MakeArvadosClient()
1144         kc, _ := MakeKeepClient(&arv)
1145         arv.ApiToken = "abc123"
1146         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1147
1148         _, err = kc.GetIndex("x", hash[0:3])
1149         c.Check(err, Equals, ErrIncompleteIndex)
1150 }
1151
1152 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1153         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1154
1155         st := StubGetIndexHandler{
1156                 c,
1157                 "/index/" + hash[0:3],
1158                 "abc123",
1159                 http.StatusOK,
1160                 []byte(hash)}
1161
1162         ks := RunFakeKeepServer(st)
1163         defer ks.listener.Close()
1164
1165         arv, err := arvadosclient.MakeArvadosClient()
1166         kc, _ := MakeKeepClient(&arv)
1167         arv.ApiToken = "abc123"
1168         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1169
1170         _, err = kc.GetIndex("y", hash[0:3])
1171         c.Check(err, Equals, ErrNoSuchKeepServer)
1172 }
1173
1174 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1175         st := StubGetIndexHandler{
1176                 c,
1177                 "/index/abcd",
1178                 "abc123",
1179                 http.StatusOK,
1180                 []byte("\n")}
1181
1182         ks := RunFakeKeepServer(st)
1183         defer ks.listener.Close()
1184
1185         arv, err := arvadosclient.MakeArvadosClient()
1186         kc, _ := MakeKeepClient(&arv)
1187         arv.ApiToken = "abc123"
1188         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1189
1190         r, err := kc.GetIndex("x", "abcd")
1191         c.Check(err, Equals, nil)
1192
1193         content, err2 := ioutil.ReadAll(r)
1194         c.Check(err2, Equals, nil)
1195         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1196 }
1197
1198 type FailThenSucceedPutHandler struct {
1199         handled        chan string
1200         count          int
1201         successhandler StubPutHandler
1202 }
1203
1204 func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1205         if h.count == 0 {
1206                 resp.WriteHeader(500)
1207                 h.count += 1
1208                 h.handled <- fmt.Sprintf("http://%s", req.Host)
1209         } else {
1210                 h.successhandler.ServeHTTP(resp, req)
1211         }
1212 }
1213
1214 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1215         st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
1216                 StubPutHandler{
1217                         c,
1218                         Md5String("foo"),
1219                         "abc123",
1220                         "foo",
1221                         make(chan string, 5)}}
1222
1223         arv, _ := arvadosclient.MakeArvadosClient()
1224         kc, _ := MakeKeepClient(&arv)
1225
1226         kc.Want_replicas = 2
1227         arv.ApiToken = "abc123"
1228         localRoots := make(map[string]string)
1229         writableLocalRoots := make(map[string]string)
1230
1231         ks := RunSomeFakeKeepServers(st, 2)
1232
1233         for i, k := range ks {
1234                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1235                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1236                 defer k.listener.Close()
1237         }
1238
1239         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1240
1241         hash, replicas, err := kc.PutB([]byte("foo"))
1242
1243         c.Check(err, Equals, nil)
1244         c.Check(hash, Equals, "")
1245         c.Check(replicas, Equals, 2)
1246 }