Merge branch 'master' into 7492-keepproxy-upstream-errors
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "crypto/md5"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9         "git.curoverse.com/arvados.git/sdk/go/streamer"
10         . "gopkg.in/check.v1"
11         "io"
12         "io/ioutil"
13         "log"
14         "net"
15         "net/http"
16         "os"
17         "strings"
18         "testing"
19 )
20
21 // Gocheck boilerplate
22 func Test(t *testing.T) {
23         TestingT(t)
24 }
25
26 // Gocheck boilerplate
27 var _ = Suite(&ServerRequiredSuite{})
28 var _ = Suite(&StandaloneSuite{})
29
30 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
31
32 // Tests that require the Keep server running
33 type ServerRequiredSuite struct{}
34
35 // Standalone tests
36 type StandaloneSuite struct{}
37
38 func pythonDir() string {
39         cwd, _ := os.Getwd()
40         return fmt.Sprintf("%s/../../python/tests", cwd)
41 }
42
43 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
44         if *no_server {
45                 c.Skip("Skipping tests that require server")
46                 return
47         }
48         arvadostest.StartAPI()
49         arvadostest.StartKeep(2, false)
50 }
51
52 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
53         if *no_server {
54                 return
55         }
56         arvadostest.StopKeep(2)
57         arvadostest.StopAPI()
58 }
59
60 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
61         arv, err := arvadosclient.MakeArvadosClient()
62         c.Assert(err, Equals, nil)
63
64         kc, err := MakeKeepClient(&arv)
65
66         c.Assert(err, Equals, nil)
67         c.Check(len(kc.LocalRoots()), Equals, 2)
68         for _, root := range kc.LocalRoots() {
69                 c.Check(root, Matches, "http://localhost:\\d+")
70         }
71 }
72
73 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
74         arv, err := arvadosclient.MakeArvadosClient()
75         c.Assert(err, Equals, nil)
76
77         kc, err := MakeKeepClient(&arv)
78         c.Assert(kc.Want_replicas, Equals, 2)
79
80         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
81         kc, err = MakeKeepClient(&arv)
82         c.Assert(kc.Want_replicas, Equals, 3)
83
84         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
85         kc, err = MakeKeepClient(&arv)
86         c.Assert(kc.Want_replicas, Equals, 1)
87 }
88
89 type StubPutHandler struct {
90         c              *C
91         expectPath     string
92         expectApiToken string
93         expectBody     string
94         handled        chan string
95 }
96
97 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
98         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
99         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
100         body, err := ioutil.ReadAll(req.Body)
101         sph.c.Check(err, Equals, nil)
102         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
103         resp.WriteHeader(200)
104         sph.handled <- fmt.Sprintf("http://%s", req.Host)
105 }
106
107 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
108         var err error
109         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
110         if err != nil {
111                 panic(fmt.Sprintf("Could not listen on any port"))
112         }
113         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
114         go http.Serve(ks.listener, st)
115         return
116 }
117
118 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
119         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
120
121         ks := RunFakeKeepServer(st)
122         defer ks.listener.Close()
123
124         arv, _ := arvadosclient.MakeArvadosClient()
125         arv.ApiToken = "abc123"
126
127         kc, _ := MakeKeepClient(&arv)
128
129         reader, writer := io.Pipe()
130         upload_status := make(chan uploadStatus)
131
132         f(kc, ks.url, reader, writer, upload_status)
133 }
134
135 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
136         log.Printf("TestUploadToStubKeepServer")
137
138         st := StubPutHandler{
139                 c,
140                 "acbd18db4cc2f85cedef654fccc4a4d8",
141                 "abc123",
142                 "foo",
143                 make(chan string)}
144
145         UploadToStubHelper(c, st,
146                 func(kc *KeepClient, url string, reader io.ReadCloser,
147                         writer io.WriteCloser, upload_status chan uploadStatus) {
148
149                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), "TestUploadToStubKeepServer")
150
151                         writer.Write([]byte("foo"))
152                         writer.Close()
153
154                         <-st.handled
155                         status := <-upload_status
156                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
157                 })
158
159         log.Printf("TestUploadToStubKeepServer done")
160 }
161
162 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
163         log.Printf("TestUploadToStubKeepServerBufferReader")
164
165         st := StubPutHandler{
166                 c,
167                 "acbd18db4cc2f85cedef654fccc4a4d8",
168                 "abc123",
169                 "foo",
170                 make(chan string)}
171
172         UploadToStubHelper(c, st,
173                 func(kc *KeepClient, url string, reader io.ReadCloser,
174                         writer io.WriteCloser, upload_status chan uploadStatus) {
175
176                         tr := streamer.AsyncStreamFromReader(512, reader)
177                         defer tr.Close()
178
179                         br1 := tr.MakeStreamReader()
180
181                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, "TestUploadToStubKeepServerBufferReader")
182
183                         writer.Write([]byte("foo"))
184                         writer.Close()
185
186                         <-st.handled
187
188                         status := <-upload_status
189                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
190                 })
191
192         log.Printf("TestUploadToStubKeepServerBufferReader done")
193 }
194
195 type FailHandler struct {
196         handled chan string
197 }
198
199 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
200         resp.WriteHeader(500)
201         fh.handled <- fmt.Sprintf("http://%s", req.Host)
202 }
203
204 type FailThenSucceedHandler struct {
205         handled        chan string
206         count          int
207         successhandler StubGetHandler
208 }
209
210 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
211         if fh.count == 0 {
212                 resp.WriteHeader(500)
213                 fh.count += 1
214                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
215         } else {
216                 fh.successhandler.ServeHTTP(resp, req)
217         }
218 }
219
220 type Error404Handler struct {
221         handled chan string
222 }
223
224 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
225         resp.WriteHeader(404)
226         fh.handled <- fmt.Sprintf("http://%s", req.Host)
227 }
228
229 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
230         log.Printf("TestFailedUploadToStubKeepServer")
231
232         st := FailHandler{
233                 make(chan string)}
234
235         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
236
237         UploadToStubHelper(c, st,
238                 func(kc *KeepClient, url string, reader io.ReadCloser,
239                         writer io.WriteCloser, upload_status chan uploadStatus) {
240
241                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, "TestFailedUploadToStubKeepServer")
242
243                         writer.Write([]byte("foo"))
244                         writer.Close()
245
246                         <-st.handled
247
248                         status := <-upload_status
249                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
250                         c.Check(status.statusCode, Equals, 500)
251                 })
252         log.Printf("TestFailedUploadToStubKeepServer done")
253 }
254
255 type KeepServer struct {
256         listener net.Listener
257         url      string
258 }
259
260 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
261         ks = make([]KeepServer, n)
262
263         for i := 0; i < n; i += 1 {
264                 ks[i] = RunFakeKeepServer(st)
265         }
266
267         return ks
268 }
269
270 func (s *StandaloneSuite) TestPutB(c *C) {
271         log.Printf("TestPutB")
272
273         hash := Md5String("foo")
274
275         st := StubPutHandler{
276                 c,
277                 hash,
278                 "abc123",
279                 "foo",
280                 make(chan string, 5)}
281
282         arv, _ := arvadosclient.MakeArvadosClient()
283         kc, _ := MakeKeepClient(&arv)
284
285         kc.Want_replicas = 2
286         arv.ApiToken = "abc123"
287         localRoots := make(map[string]string)
288         writableLocalRoots := make(map[string]string)
289
290         ks := RunSomeFakeKeepServers(st, 5)
291
292         for i, k := range ks {
293                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
294                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
295                 defer k.listener.Close()
296         }
297
298         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
299
300         kc.PutB([]byte("foo"))
301
302         shuff := NewRootSorter(
303                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
304
305         s1 := <-st.handled
306         s2 := <-st.handled
307         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
308                 (s1 == shuff[1] && s2 == shuff[0]),
309                 Equals,
310                 true)
311
312         log.Printf("TestPutB done")
313 }
314
315 func (s *StandaloneSuite) TestPutHR(c *C) {
316         log.Printf("TestPutHR")
317
318         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
319
320         st := StubPutHandler{
321                 c,
322                 hash,
323                 "abc123",
324                 "foo",
325                 make(chan string, 5)}
326
327         arv, _ := arvadosclient.MakeArvadosClient()
328         kc, _ := MakeKeepClient(&arv)
329
330         kc.Want_replicas = 2
331         arv.ApiToken = "abc123"
332         localRoots := make(map[string]string)
333         writableLocalRoots := make(map[string]string)
334
335         ks := RunSomeFakeKeepServers(st, 5)
336
337         for i, k := range ks {
338                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
339                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
340                 defer k.listener.Close()
341         }
342
343         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
344
345         reader, writer := io.Pipe()
346
347         go func() {
348                 writer.Write([]byte("foo"))
349                 writer.Close()
350         }()
351
352         kc.PutHR(hash, reader, 3)
353
354         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
355         log.Print(shuff)
356
357         s1 := <-st.handled
358         s2 := <-st.handled
359
360         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
361                 (s1 == shuff[1] && s2 == shuff[0]),
362                 Equals,
363                 true)
364
365         log.Printf("TestPutHR done")
366 }
367
368 func (s *StandaloneSuite) TestPutWithFail(c *C) {
369         log.Printf("TestPutWithFail")
370
371         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
372
373         st := StubPutHandler{
374                 c,
375                 hash,
376                 "abc123",
377                 "foo",
378                 make(chan string, 4)}
379
380         fh := FailHandler{
381                 make(chan string, 1)}
382
383         arv, err := arvadosclient.MakeArvadosClient()
384         kc, _ := MakeKeepClient(&arv)
385
386         kc.Want_replicas = 2
387         arv.ApiToken = "abc123"
388         localRoots := make(map[string]string)
389         writableLocalRoots := make(map[string]string)
390
391         ks1 := RunSomeFakeKeepServers(st, 4)
392         ks2 := RunSomeFakeKeepServers(fh, 1)
393
394         for i, k := range ks1 {
395                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
396                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
397                 defer k.listener.Close()
398         }
399         for i, k := range ks2 {
400                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
401                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
402                 defer k.listener.Close()
403         }
404
405         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
406
407         shuff := NewRootSorter(
408                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
409
410         phash, replicas, err := kc.PutB([]byte("foo"))
411
412         <-fh.handled
413
414         c.Check(err, Equals, nil)
415         c.Check(phash, Equals, "")
416         c.Check(replicas, Equals, 2)
417
418         s1 := <-st.handled
419         s2 := <-st.handled
420
421         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
422                 (s1 == shuff[2] && s2 == shuff[1]),
423                 Equals,
424                 true)
425 }
426
427 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
428         log.Printf("TestPutWithTooManyFail")
429
430         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
431
432         st := StubPutHandler{
433                 c,
434                 hash,
435                 "abc123",
436                 "foo",
437                 make(chan string, 1)}
438
439         fh := FailHandler{
440                 make(chan string, 4)}
441
442         arv, err := arvadosclient.MakeArvadosClient()
443         kc, _ := MakeKeepClient(&arv)
444
445         kc.Want_replicas = 2
446         arv.ApiToken = "abc123"
447         localRoots := make(map[string]string)
448         writableLocalRoots := make(map[string]string)
449
450         ks1 := RunSomeFakeKeepServers(st, 1)
451         ks2 := RunSomeFakeKeepServers(fh, 4)
452
453         for i, k := range ks1 {
454                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
455                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
456                 defer k.listener.Close()
457         }
458         for i, k := range ks2 {
459                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
460                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
461                 defer k.listener.Close()
462         }
463
464         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
465
466         _, replicas, err := kc.PutB([]byte("foo"))
467
468         c.Check(err, Equals, InsufficientReplicasError)
469         c.Check(replicas, Equals, 1)
470         c.Check(<-st.handled, Equals, ks1[0].url)
471
472         log.Printf("TestPutWithTooManyFail done")
473 }
474
475 type StubGetHandler struct {
476         c              *C
477         expectPath     string
478         expectApiToken string
479         httpStatus     int
480         body           []byte
481 }
482
483 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
484         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
485         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
486         resp.WriteHeader(sgh.httpStatus)
487         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
488         resp.Write(sgh.body)
489 }
490
491 func (s *StandaloneSuite) TestGet(c *C) {
492         log.Printf("TestGet")
493
494         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
495
496         st := StubGetHandler{
497                 c,
498                 hash,
499                 "abc123",
500                 http.StatusOK,
501                 []byte("foo")}
502
503         ks := RunFakeKeepServer(st)
504         defer ks.listener.Close()
505
506         arv, err := arvadosclient.MakeArvadosClient()
507         kc, _ := MakeKeepClient(&arv)
508         arv.ApiToken = "abc123"
509         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
510
511         r, n, url2, err := kc.Get(hash)
512         defer r.Close()
513         c.Check(err, Equals, nil)
514         c.Check(n, Equals, int64(3))
515         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
516
517         content, err2 := ioutil.ReadAll(r)
518         c.Check(err2, Equals, nil)
519         c.Check(content, DeepEquals, []byte("foo"))
520
521         log.Printf("TestGet done")
522 }
523
524 func (s *StandaloneSuite) TestGet404(c *C) {
525         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
526
527         st := Error404Handler{make(chan string, 1)}
528
529         ks := RunFakeKeepServer(st)
530         defer ks.listener.Close()
531
532         arv, err := arvadosclient.MakeArvadosClient()
533         kc, _ := MakeKeepClient(&arv)
534         arv.ApiToken = "abc123"
535         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
536
537         r, n, url2, err := kc.Get(hash)
538         c.Check(err, Equals, BlockNotFound)
539         c.Check(n, Equals, int64(0))
540         c.Check(url2, Equals, "")
541         c.Check(r, Equals, nil)
542 }
543
544 func (s *StandaloneSuite) TestGetFail(c *C) {
545         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
546
547         st := FailHandler{make(chan string, 1)}
548
549         ks := RunFakeKeepServer(st)
550         defer ks.listener.Close()
551
552         arv, err := arvadosclient.MakeArvadosClient()
553         kc, _ := MakeKeepClient(&arv)
554         arv.ApiToken = "abc123"
555         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
556
557         r, n, url2, err := kc.Get(hash)
558         errNotFound, _ := err.(ErrNotFound)
559         c.Check(errNotFound, NotNil)
560         c.Check(strings.Contains(err.Error(), "use of closed network connection"), Equals, true)
561         c.Check(n, Equals, int64(0))
562         c.Check(url2, Equals, "")
563         c.Check(r, Equals, nil)
564 }
565
566 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
567         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
568
569         st := &FailThenSucceedHandler{make(chan string, 1), 0,
570                 StubGetHandler{
571                         c,
572                         hash,
573                         "abc123",
574                         http.StatusOK,
575                         []byte("foo")}}
576
577         ks := RunFakeKeepServer(st)
578         defer ks.listener.Close()
579
580         arv, err := arvadosclient.MakeArvadosClient()
581         kc, _ := MakeKeepClient(&arv)
582         arv.ApiToken = "abc123"
583         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
584
585         r, n, url2, err := kc.Get(hash)
586         defer r.Close()
587         c.Check(err, Equals, nil)
588         c.Check(n, Equals, int64(3))
589         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
590
591         content, err2 := ioutil.ReadAll(r)
592         c.Check(err2, Equals, nil)
593         c.Check(content, DeepEquals, []byte("foo"))
594 }
595
596 func (s *StandaloneSuite) TestGetNetError(c *C) {
597         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
598
599         arv, err := arvadosclient.MakeArvadosClient()
600         kc, _ := MakeKeepClient(&arv)
601         arv.ApiToken = "abc123"
602         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
603
604         r, n, url2, err := kc.Get(hash)
605         errNotFound, _ := err.(ErrNotFound)
606         c.Check(errNotFound, NotNil)
607         c.Check(strings.Contains(err.Error(), "connection refused"), Equals, true)
608         c.Check(n, Equals, int64(0))
609         c.Check(url2, Equals, "")
610         c.Check(r, Equals, nil)
611 }
612
613 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
614         uuid := "zzzzz-bi6l4-123451234512345"
615         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
616
617         // This one shouldn't be used:
618         ks0 := RunFakeKeepServer(StubGetHandler{
619                 c,
620                 "error if used",
621                 "abc123",
622                 http.StatusOK,
623                 []byte("foo")})
624         defer ks0.listener.Close()
625         // This one should be used:
626         ks := RunFakeKeepServer(StubGetHandler{
627                 c,
628                 hash + "+K@" + uuid,
629                 "abc123",
630                 http.StatusOK,
631                 []byte("foo")})
632         defer ks.listener.Close()
633
634         arv, err := arvadosclient.MakeArvadosClient()
635         kc, _ := MakeKeepClient(&arv)
636         arv.ApiToken = "abc123"
637         kc.SetServiceRoots(
638                 map[string]string{"x": ks0.url},
639                 nil,
640                 map[string]string{uuid: ks.url})
641
642         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
643         defer r.Close()
644         c.Check(err, Equals, nil)
645         c.Check(n, Equals, int64(3))
646         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
647
648         content, err := ioutil.ReadAll(r)
649         c.Check(err, Equals, nil)
650         c.Check(content, DeepEquals, []byte("foo"))
651 }
652
653 // Use a service hint to fetch from a local disk service, overriding
654 // rendezvous probe order.
655 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
656         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
657         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
658
659         // This one shouldn't be used, although it appears first in
660         // rendezvous probe order:
661         ks0 := RunFakeKeepServer(StubGetHandler{
662                 c,
663                 "error if used",
664                 "abc123",
665                 http.StatusOK,
666                 []byte("foo")})
667         defer ks0.listener.Close()
668         // This one should be used:
669         ks := RunFakeKeepServer(StubGetHandler{
670                 c,
671                 hash + "+K@" + uuid,
672                 "abc123",
673                 http.StatusOK,
674                 []byte("foo")})
675         defer ks.listener.Close()
676
677         arv, err := arvadosclient.MakeArvadosClient()
678         kc, _ := MakeKeepClient(&arv)
679         arv.ApiToken = "abc123"
680         kc.SetServiceRoots(
681                 map[string]string{
682                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
683                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
684                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
685                         uuid: ks.url},
686                 nil,
687                 map[string]string{
688                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
689                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
690                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
691                         uuid: ks.url},
692         )
693
694         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
695         defer r.Close()
696         c.Check(err, Equals, nil)
697         c.Check(n, Equals, int64(3))
698         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
699
700         content, err := ioutil.ReadAll(r)
701         c.Check(err, Equals, nil)
702         c.Check(content, DeepEquals, []byte("foo"))
703 }
704
705 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
706         uuid := "zzzzz-bi6l4-123451234512345"
707         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
708
709         ksLocal := RunFakeKeepServer(StubGetHandler{
710                 c,
711                 hash + "+K@" + uuid,
712                 "abc123",
713                 http.StatusOK,
714                 []byte("foo")})
715         defer ksLocal.listener.Close()
716         ksGateway := RunFakeKeepServer(StubGetHandler{
717                 c,
718                 hash + "+K@" + uuid,
719                 "abc123",
720                 http.StatusInternalServerError,
721                 []byte("Error")})
722         defer ksGateway.listener.Close()
723
724         arv, err := arvadosclient.MakeArvadosClient()
725         kc, _ := MakeKeepClient(&arv)
726         arv.ApiToken = "abc123"
727         kc.SetServiceRoots(
728                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
729                 nil,
730                 map[string]string{uuid: ksGateway.url})
731
732         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
733         c.Assert(err, Equals, nil)
734         defer r.Close()
735         c.Check(n, Equals, int64(3))
736         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
737
738         content, err := ioutil.ReadAll(r)
739         c.Check(err, Equals, nil)
740         c.Check(content, DeepEquals, []byte("foo"))
741 }
742
743 type BarHandler struct {
744         handled chan string
745 }
746
747 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
748         resp.Write([]byte("bar"))
749         this.handled <- fmt.Sprintf("http://%s", req.Host)
750 }
751
752 func (s *StandaloneSuite) TestChecksum(c *C) {
753         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
754         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
755
756         st := BarHandler{make(chan string, 1)}
757
758         ks := RunFakeKeepServer(st)
759         defer ks.listener.Close()
760
761         arv, err := arvadosclient.MakeArvadosClient()
762         kc, _ := MakeKeepClient(&arv)
763         arv.ApiToken = "abc123"
764         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
765
766         r, n, _, err := kc.Get(barhash)
767         _, err = ioutil.ReadAll(r)
768         c.Check(n, Equals, int64(3))
769         c.Check(err, Equals, nil)
770
771         <-st.handled
772
773         r, n, _, err = kc.Get(foohash)
774         _, err = ioutil.ReadAll(r)
775         c.Check(n, Equals, int64(3))
776         c.Check(err, Equals, BadChecksum)
777
778         <-st.handled
779 }
780
781 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
782         content := []byte("waz")
783         hash := fmt.Sprintf("%x", md5.Sum(content))
784
785         fh := Error404Handler{
786                 make(chan string, 4)}
787
788         st := StubGetHandler{
789                 c,
790                 hash,
791                 "abc123",
792                 http.StatusOK,
793                 content}
794
795         arv, err := arvadosclient.MakeArvadosClient()
796         kc, _ := MakeKeepClient(&arv)
797         arv.ApiToken = "abc123"
798         localRoots := make(map[string]string)
799         writableLocalRoots := make(map[string]string)
800
801         ks1 := RunSomeFakeKeepServers(st, 1)
802         ks2 := RunSomeFakeKeepServers(fh, 4)
803
804         for i, k := range ks1 {
805                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
806                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
807                 defer k.listener.Close()
808         }
809         for i, k := range ks2 {
810                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
811                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
812                 defer k.listener.Close()
813         }
814
815         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
816
817         // This test works only if one of the failing services is
818         // attempted before the succeeding service. Otherwise,
819         // <-fh.handled below will just hang! (Probe order depends on
820         // the choice of block content "waz" and the UUIDs of the fake
821         // servers, so we just tried different strings until we found
822         // an example that passes this Assert.)
823         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
824
825         r, n, url2, err := kc.Get(hash)
826
827         <-fh.handled
828         c.Check(err, Equals, nil)
829         c.Check(n, Equals, int64(3))
830         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
831
832         read_content, err2 := ioutil.ReadAll(r)
833         c.Check(err2, Equals, nil)
834         c.Check(read_content, DeepEquals, content)
835 }
836
837 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
838         content := []byte("TestPutGetHead")
839
840         arv, err := arvadosclient.MakeArvadosClient()
841         kc, err := MakeKeepClient(&arv)
842         c.Assert(err, Equals, nil)
843
844         hash := fmt.Sprintf("%x", md5.Sum(content))
845
846         {
847                 n, _, err := kc.Ask(hash)
848                 c.Check(err, Equals, BlockNotFound)
849                 c.Check(n, Equals, int64(0))
850         }
851         {
852                 hash2, replicas, err := kc.PutB(content)
853                 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
854                 c.Check(replicas, Equals, 2)
855                 c.Check(err, Equals, nil)
856         }
857         {
858                 r, n, url2, err := kc.Get(hash)
859                 c.Check(err, Equals, nil)
860                 c.Check(n, Equals, int64(len(content)))
861                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
862
863                 read_content, err2 := ioutil.ReadAll(r)
864                 c.Check(err2, Equals, nil)
865                 c.Check(read_content, DeepEquals, content)
866         }
867         {
868                 n, url2, err := kc.Ask(hash)
869                 c.Check(err, Equals, nil)
870                 c.Check(n, Equals, int64(len(content)))
871                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
872         }
873 }
874
875 type StubProxyHandler struct {
876         handled chan string
877 }
878
879 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
880         resp.Header().Set("X-Keep-Replicas-Stored", "2")
881         this.handled <- fmt.Sprintf("http://%s", req.Host)
882 }
883
884 func (s *StandaloneSuite) TestPutProxy(c *C) {
885         log.Printf("TestPutProxy")
886
887         st := StubProxyHandler{make(chan string, 1)}
888
889         arv, err := arvadosclient.MakeArvadosClient()
890         kc, _ := MakeKeepClient(&arv)
891
892         kc.Want_replicas = 2
893         kc.Using_proxy = true
894         arv.ApiToken = "abc123"
895         localRoots := make(map[string]string)
896         writableLocalRoots := make(map[string]string)
897
898         ks1 := RunSomeFakeKeepServers(st, 1)
899
900         for i, k := range ks1 {
901                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
902                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
903                 defer k.listener.Close()
904         }
905
906         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
907
908         _, replicas, err := kc.PutB([]byte("foo"))
909         <-st.handled
910
911         c.Check(err, Equals, nil)
912         c.Check(replicas, Equals, 2)
913
914         log.Printf("TestPutProxy done")
915 }
916
917 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
918         log.Printf("TestPutProxy")
919
920         st := StubProxyHandler{make(chan string, 1)}
921
922         arv, err := arvadosclient.MakeArvadosClient()
923         kc, _ := MakeKeepClient(&arv)
924
925         kc.Want_replicas = 3
926         kc.Using_proxy = true
927         arv.ApiToken = "abc123"
928         localRoots := make(map[string]string)
929         writableLocalRoots := make(map[string]string)
930
931         ks1 := RunSomeFakeKeepServers(st, 1)
932
933         for i, k := range ks1 {
934                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
935                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
936                 defer k.listener.Close()
937         }
938         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
939
940         _, replicas, err := kc.PutB([]byte("foo"))
941         <-st.handled
942
943         c.Check(err, Equals, InsufficientReplicasError)
944         c.Check(replicas, Equals, 2)
945
946         log.Printf("TestPutProxy done")
947 }
948
949 func (s *StandaloneSuite) TestMakeLocator(c *C) {
950         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
951         c.Check(err, Equals, nil)
952         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
953         c.Check(l.Size, Equals, 3)
954         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
955 }
956
957 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
958         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
959         c.Check(err, Equals, nil)
960         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
961         c.Check(l.Size, Equals, -1)
962         c.Check(l.Hints, DeepEquals, []string{})
963 }
964
965 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
966         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
967         c.Check(err, Equals, nil)
968         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
969         c.Check(l.Size, Equals, -1)
970         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
971 }
972
973 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
974         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
975         l, err := MakeLocator(str)
976         c.Check(err, Equals, nil)
977         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
978         c.Check(l.Size, Equals, 3)
979         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
980         c.Check(l.String(), Equals, str)
981 }
982
983 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
984         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
985         c.Check(err, Equals, InvalidLocatorError)
986 }
987
988 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
989         hash := Md5String("foo")
990
991         st := StubPutHandler{
992                 c,
993                 hash,
994                 "abc123",
995                 "foo",
996                 make(chan string, 5)}
997
998         arv, _ := arvadosclient.MakeArvadosClient()
999         kc, _ := MakeKeepClient(&arv)
1000
1001         kc.Want_replicas = 2
1002         arv.ApiToken = "abc123"
1003         localRoots := make(map[string]string)
1004         writableLocalRoots := make(map[string]string)
1005
1006         ks := RunSomeFakeKeepServers(st, 5)
1007
1008         for i, k := range ks {
1009                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1010                 if i == 0 {
1011                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1012                 }
1013                 defer k.listener.Close()
1014         }
1015
1016         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1017
1018         _, replicas, err := kc.PutB([]byte("foo"))
1019
1020         c.Check(err, Equals, InsufficientReplicasError)
1021         c.Check(replicas, Equals, 1)
1022
1023         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1024 }
1025
1026 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1027         hash := Md5String("foo")
1028
1029         st := StubPutHandler{
1030                 c,
1031                 hash,
1032                 "abc123",
1033                 "foo",
1034                 make(chan string, 5)}
1035
1036         arv, _ := arvadosclient.MakeArvadosClient()
1037         kc, _ := MakeKeepClient(&arv)
1038
1039         kc.Want_replicas = 2
1040         arv.ApiToken = "abc123"
1041         localRoots := make(map[string]string)
1042         writableLocalRoots := make(map[string]string)
1043
1044         ks := RunSomeFakeKeepServers(st, 5)
1045
1046         for i, k := range ks {
1047                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1048                 defer k.listener.Close()
1049         }
1050
1051         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1052
1053         _, replicas, err := kc.PutB([]byte("foo"))
1054
1055         c.Check(err, Equals, InsufficientReplicasError)
1056         c.Check(replicas, Equals, 0)
1057 }
1058
1059 type StubGetIndexHandler struct {
1060         c              *C
1061         expectPath     string
1062         expectAPIToken string
1063         httpStatus     int
1064         body           []byte
1065 }
1066
1067 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1068         h.c.Check(req.URL.Path, Equals, h.expectPath)
1069         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1070         resp.WriteHeader(h.httpStatus)
1071         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1072         resp.Write(h.body)
1073 }
1074
1075 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1076         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1077
1078         st := StubGetIndexHandler{
1079                 c,
1080                 "/index",
1081                 "abc123",
1082                 http.StatusOK,
1083                 []byte(hash + "+3 1443559274\n\n")}
1084
1085         ks := RunFakeKeepServer(st)
1086         defer ks.listener.Close()
1087
1088         arv, err := arvadosclient.MakeArvadosClient()
1089         kc, _ := MakeKeepClient(&arv)
1090         arv.ApiToken = "abc123"
1091         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1092
1093         r, err := kc.GetIndex("x", "")
1094         c.Check(err, Equals, nil)
1095
1096         content, err2 := ioutil.ReadAll(r)
1097         c.Check(err2, Equals, nil)
1098         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1099 }
1100
1101 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1102         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1103
1104         st := StubGetIndexHandler{
1105                 c,
1106                 "/index/" + hash[0:3],
1107                 "abc123",
1108                 http.StatusOK,
1109                 []byte(hash + "+3 1443559274\n\n")}
1110
1111         ks := RunFakeKeepServer(st)
1112         defer ks.listener.Close()
1113
1114         arv, err := arvadosclient.MakeArvadosClient()
1115         kc, _ := MakeKeepClient(&arv)
1116         arv.ApiToken = "abc123"
1117         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1118
1119         r, err := kc.GetIndex("x", hash[0:3])
1120         c.Check(err, Equals, nil)
1121
1122         content, err2 := ioutil.ReadAll(r)
1123         c.Check(err2, Equals, nil)
1124         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1125 }
1126
1127 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1128         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1129
1130         st := StubGetIndexHandler{
1131                 c,
1132                 "/index/" + hash[0:3],
1133                 "abc123",
1134                 http.StatusOK,
1135                 []byte(hash)}
1136
1137         ks := RunFakeKeepServer(st)
1138         defer ks.listener.Close()
1139
1140         arv, err := arvadosclient.MakeArvadosClient()
1141         kc, _ := MakeKeepClient(&arv)
1142         arv.ApiToken = "abc123"
1143         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1144
1145         _, err = kc.GetIndex("x", hash[0:3])
1146         c.Check(err, Equals, ErrIncompleteIndex)
1147 }
1148
1149 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1150         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1151
1152         st := StubGetIndexHandler{
1153                 c,
1154                 "/index/" + hash[0:3],
1155                 "abc123",
1156                 http.StatusOK,
1157                 []byte(hash)}
1158
1159         ks := RunFakeKeepServer(st)
1160         defer ks.listener.Close()
1161
1162         arv, err := arvadosclient.MakeArvadosClient()
1163         kc, _ := MakeKeepClient(&arv)
1164         arv.ApiToken = "abc123"
1165         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1166
1167         _, err = kc.GetIndex("y", hash[0:3])
1168         c.Check(err, Equals, ErrNoSuchKeepServer)
1169 }
1170
1171 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1172         st := StubGetIndexHandler{
1173                 c,
1174                 "/index/abcd",
1175                 "abc123",
1176                 http.StatusOK,
1177                 []byte("\n")}
1178
1179         ks := RunFakeKeepServer(st)
1180         defer ks.listener.Close()
1181
1182         arv, err := arvadosclient.MakeArvadosClient()
1183         kc, _ := MakeKeepClient(&arv)
1184         arv.ApiToken = "abc123"
1185         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1186
1187         r, err := kc.GetIndex("x", "abcd")
1188         c.Check(err, Equals, nil)
1189
1190         content, err2 := ioutil.ReadAll(r)
1191         c.Check(err2, Equals, nil)
1192         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1193 }