Merge branch 'master' into 7492-keepproxy-upstream-errors
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "crypto/md5"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9         "git.curoverse.com/arvados.git/sdk/go/streamer"
10         . "gopkg.in/check.v1"
11         "io"
12         "io/ioutil"
13         "log"
14         "net"
15         "net/http"
16         "os"
17         "strings"
18         "testing"
19 )
20
21 // Gocheck boilerplate
22 func Test(t *testing.T) {
23         TestingT(t)
24 }
25
26 // Gocheck boilerplate
27 var _ = Suite(&ServerRequiredSuite{})
28 var _ = Suite(&StandaloneSuite{})
29
30 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
31
32 // Tests that require the Keep server running
33 type ServerRequiredSuite struct{}
34
35 // Standalone tests
36 type StandaloneSuite struct{}
37
38 func pythonDir() string {
39         cwd, _ := os.Getwd()
40         return fmt.Sprintf("%s/../../python/tests", cwd)
41 }
42
43 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
44         if *no_server {
45                 c.Skip("Skipping tests that require server")
46                 return
47         }
48         arvadostest.StartAPI()
49         arvadostest.StartKeep(2, false)
50 }
51
52 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
53         if *no_server {
54                 return
55         }
56         arvadostest.StopKeep(2)
57         arvadostest.StopAPI()
58 }
59
60 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
61         arv, err := arvadosclient.MakeArvadosClient()
62         c.Assert(err, Equals, nil)
63
64         kc, err := MakeKeepClient(&arv)
65
66         c.Assert(err, Equals, nil)
67         c.Check(len(kc.LocalRoots()), Equals, 2)
68         for _, root := range kc.LocalRoots() {
69                 c.Check(root, Matches, "http://localhost:\\d+")
70         }
71 }
72
73 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
74         arv, err := arvadosclient.MakeArvadosClient()
75         c.Assert(err, Equals, nil)
76
77         kc, err := MakeKeepClient(&arv)
78         c.Assert(kc.Want_replicas, Equals, 2)
79
80         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
81         kc, err = MakeKeepClient(&arv)
82         c.Assert(kc.Want_replicas, Equals, 3)
83
84         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
85         kc, err = MakeKeepClient(&arv)
86         c.Assert(kc.Want_replicas, Equals, 1)
87 }
88
89 type StubPutHandler struct {
90         c              *C
91         expectPath     string
92         expectApiToken string
93         expectBody     string
94         handled        chan string
95 }
96
97 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
98         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
99         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
100         body, err := ioutil.ReadAll(req.Body)
101         sph.c.Check(err, Equals, nil)
102         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
103         resp.WriteHeader(200)
104         sph.handled <- fmt.Sprintf("http://%s", req.Host)
105 }
106
107 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
108         var err error
109         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
110         if err != nil {
111                 panic(fmt.Sprintf("Could not listen on any port"))
112         }
113         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
114         go http.Serve(ks.listener, st)
115         return
116 }
117
118 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
119         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
120
121         ks := RunFakeKeepServer(st)
122         defer ks.listener.Close()
123
124         arv, _ := arvadosclient.MakeArvadosClient()
125         arv.ApiToken = "abc123"
126
127         kc, _ := MakeKeepClient(&arv)
128
129         reader, writer := io.Pipe()
130         upload_status := make(chan uploadStatus)
131
132         f(kc, ks.url, reader, writer, upload_status)
133 }
134
135 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
136         log.Printf("TestUploadToStubKeepServer")
137
138         st := StubPutHandler{
139                 c,
140                 "acbd18db4cc2f85cedef654fccc4a4d8",
141                 "abc123",
142                 "foo",
143                 make(chan string)}
144
145         UploadToStubHelper(c, st,
146                 func(kc *KeepClient, url string, reader io.ReadCloser,
147                         writer io.WriteCloser, upload_status chan uploadStatus) {
148
149                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), "TestUploadToStubKeepServer")
150
151                         writer.Write([]byte("foo"))
152                         writer.Close()
153
154                         <-st.handled
155                         status := <-upload_status
156                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
157                 })
158
159         log.Printf("TestUploadToStubKeepServer done")
160 }
161
162 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
163         log.Printf("TestUploadToStubKeepServerBufferReader")
164
165         st := StubPutHandler{
166                 c,
167                 "acbd18db4cc2f85cedef654fccc4a4d8",
168                 "abc123",
169                 "foo",
170                 make(chan string)}
171
172         UploadToStubHelper(c, st,
173                 func(kc *KeepClient, url string, reader io.ReadCloser,
174                         writer io.WriteCloser, upload_status chan uploadStatus) {
175
176                         tr := streamer.AsyncStreamFromReader(512, reader)
177                         defer tr.Close()
178
179                         br1 := tr.MakeStreamReader()
180
181                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, "TestUploadToStubKeepServerBufferReader")
182
183                         writer.Write([]byte("foo"))
184                         writer.Close()
185
186                         <-st.handled
187
188                         status := <-upload_status
189                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
190                 })
191
192         log.Printf("TestUploadToStubKeepServerBufferReader done")
193 }
194
195 type FailHandler struct {
196         handled chan string
197 }
198
199 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
200         resp.WriteHeader(500)
201         fh.handled <- fmt.Sprintf("http://%s", req.Host)
202 }
203
204 type FailThenSucceedHandler struct {
205         handled        chan string
206         count          int
207         successhandler StubGetHandler
208 }
209
210 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
211         if fh.count == 0 {
212                 resp.WriteHeader(500)
213                 fh.count += 1
214                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
215         } else {
216                 fh.successhandler.ServeHTTP(resp, req)
217         }
218 }
219
220 type Error404Handler struct {
221         handled chan string
222 }
223
224 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
225         resp.WriteHeader(404)
226         fh.handled <- fmt.Sprintf("http://%s", req.Host)
227 }
228
229 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
230         log.Printf("TestFailedUploadToStubKeepServer")
231
232         st := FailHandler{
233                 make(chan string)}
234
235         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
236
237         UploadToStubHelper(c, st,
238                 func(kc *KeepClient, url string, reader io.ReadCloser,
239                         writer io.WriteCloser, upload_status chan uploadStatus) {
240
241                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, "TestFailedUploadToStubKeepServer")
242
243                         writer.Write([]byte("foo"))
244                         writer.Close()
245
246                         <-st.handled
247
248                         status := <-upload_status
249                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
250                         c.Check(status.statusCode, Equals, 500)
251                 })
252         log.Printf("TestFailedUploadToStubKeepServer done")
253 }
254
255 type KeepServer struct {
256         listener net.Listener
257         url      string
258 }
259
260 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
261         ks = make([]KeepServer, n)
262
263         for i := 0; i < n; i += 1 {
264                 ks[i] = RunFakeKeepServer(st)
265         }
266
267         return ks
268 }
269
270 func (s *StandaloneSuite) TestPutB(c *C) {
271         log.Printf("TestPutB")
272
273         hash := Md5String("foo")
274
275         st := StubPutHandler{
276                 c,
277                 hash,
278                 "abc123",
279                 "foo",
280                 make(chan string, 5)}
281
282         arv, _ := arvadosclient.MakeArvadosClient()
283         kc, _ := MakeKeepClient(&arv)
284
285         kc.Want_replicas = 2
286         arv.ApiToken = "abc123"
287         localRoots := make(map[string]string)
288         writableLocalRoots := make(map[string]string)
289
290         ks := RunSomeFakeKeepServers(st, 5)
291
292         for i, k := range ks {
293                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
294                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
295                 defer k.listener.Close()
296         }
297
298         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
299
300         kc.PutB([]byte("foo"))
301
302         shuff := NewRootSorter(
303                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
304
305         s1 := <-st.handled
306         s2 := <-st.handled
307         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
308                 (s1 == shuff[1] && s2 == shuff[0]),
309                 Equals,
310                 true)
311
312         log.Printf("TestPutB done")
313 }
314
315 func (s *StandaloneSuite) TestPutHR(c *C) {
316         log.Printf("TestPutHR")
317
318         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
319
320         st := StubPutHandler{
321                 c,
322                 hash,
323                 "abc123",
324                 "foo",
325                 make(chan string, 5)}
326
327         arv, _ := arvadosclient.MakeArvadosClient()
328         kc, _ := MakeKeepClient(&arv)
329
330         kc.Want_replicas = 2
331         arv.ApiToken = "abc123"
332         localRoots := make(map[string]string)
333         writableLocalRoots := make(map[string]string)
334
335         ks := RunSomeFakeKeepServers(st, 5)
336
337         for i, k := range ks {
338                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
339                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
340                 defer k.listener.Close()
341         }
342
343         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
344
345         reader, writer := io.Pipe()
346
347         go func() {
348                 writer.Write([]byte("foo"))
349                 writer.Close()
350         }()
351
352         kc.PutHR(hash, reader, 3)
353
354         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
355         log.Print(shuff)
356
357         s1 := <-st.handled
358         s2 := <-st.handled
359
360         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
361                 (s1 == shuff[1] && s2 == shuff[0]),
362                 Equals,
363                 true)
364
365         log.Printf("TestPutHR done")
366 }
367
368 func (s *StandaloneSuite) TestPutWithFail(c *C) {
369         log.Printf("TestPutWithFail")
370
371         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
372
373         st := StubPutHandler{
374                 c,
375                 hash,
376                 "abc123",
377                 "foo",
378                 make(chan string, 4)}
379
380         fh := FailHandler{
381                 make(chan string, 1)}
382
383         arv, err := arvadosclient.MakeArvadosClient()
384         kc, _ := MakeKeepClient(&arv)
385
386         kc.Want_replicas = 2
387         arv.ApiToken = "abc123"
388         localRoots := make(map[string]string)
389         writableLocalRoots := make(map[string]string)
390
391         ks1 := RunSomeFakeKeepServers(st, 4)
392         ks2 := RunSomeFakeKeepServers(fh, 1)
393
394         for i, k := range ks1 {
395                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
396                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
397                 defer k.listener.Close()
398         }
399         for i, k := range ks2 {
400                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
401                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
402                 defer k.listener.Close()
403         }
404
405         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
406
407         shuff := NewRootSorter(
408                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
409
410         phash, replicas, err := kc.PutB([]byte("foo"))
411
412         <-fh.handled
413
414         c.Check(err, Equals, nil)
415         c.Check(phash, Equals, "")
416         c.Check(replicas, Equals, 2)
417
418         s1 := <-st.handled
419         s2 := <-st.handled
420
421         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
422                 (s1 == shuff[2] && s2 == shuff[1]),
423                 Equals,
424                 true)
425 }
426
427 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
428         log.Printf("TestPutWithTooManyFail")
429
430         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
431
432         st := StubPutHandler{
433                 c,
434                 hash,
435                 "abc123",
436                 "foo",
437                 make(chan string, 1)}
438
439         fh := FailHandler{
440                 make(chan string, 4)}
441
442         arv, err := arvadosclient.MakeArvadosClient()
443         kc, _ := MakeKeepClient(&arv)
444
445         kc.Want_replicas = 2
446         kc.Retries = 0
447         arv.ApiToken = "abc123"
448         localRoots := make(map[string]string)
449         writableLocalRoots := make(map[string]string)
450
451         ks1 := RunSomeFakeKeepServers(st, 1)
452         ks2 := RunSomeFakeKeepServers(fh, 4)
453
454         for i, k := range ks1 {
455                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
456                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
457                 defer k.listener.Close()
458         }
459         for i, k := range ks2 {
460                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
461                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
462                 defer k.listener.Close()
463         }
464
465         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
466
467         _, replicas, err := kc.PutB([]byte("foo"))
468
469         c.Check(err, Equals, InsufficientReplicasError)
470         c.Check(replicas, Equals, 1)
471         c.Check(<-st.handled, Equals, ks1[0].url)
472
473         log.Printf("TestPutWithTooManyFail done")
474 }
475
476 type StubGetHandler struct {
477         c              *C
478         expectPath     string
479         expectApiToken string
480         httpStatus     int
481         body           []byte
482 }
483
484 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
485         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
486         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
487         resp.WriteHeader(sgh.httpStatus)
488         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
489         resp.Write(sgh.body)
490 }
491
492 func (s *StandaloneSuite) TestGet(c *C) {
493         log.Printf("TestGet")
494
495         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
496
497         st := StubGetHandler{
498                 c,
499                 hash,
500                 "abc123",
501                 http.StatusOK,
502                 []byte("foo")}
503
504         ks := RunFakeKeepServer(st)
505         defer ks.listener.Close()
506
507         arv, err := arvadosclient.MakeArvadosClient()
508         kc, _ := MakeKeepClient(&arv)
509         arv.ApiToken = "abc123"
510         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
511
512         r, n, url2, err := kc.Get(hash)
513         defer r.Close()
514         c.Check(err, Equals, nil)
515         c.Check(n, Equals, int64(3))
516         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
517
518         content, err2 := ioutil.ReadAll(r)
519         c.Check(err2, Equals, nil)
520         c.Check(content, DeepEquals, []byte("foo"))
521
522         log.Printf("TestGet done")
523 }
524
525 func (s *StandaloneSuite) TestGet404(c *C) {
526         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
527
528         st := Error404Handler{make(chan string, 1)}
529
530         ks := RunFakeKeepServer(st)
531         defer ks.listener.Close()
532
533         arv, err := arvadosclient.MakeArvadosClient()
534         kc, _ := MakeKeepClient(&arv)
535         arv.ApiToken = "abc123"
536         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
537
538         r, n, url2, err := kc.Get(hash)
539         c.Check(err, Equals, BlockNotFound)
540         c.Check(n, Equals, int64(0))
541         c.Check(url2, Equals, "")
542         c.Check(r, Equals, nil)
543 }
544
545 func (s *StandaloneSuite) TestGetFail(c *C) {
546         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
547
548         st := FailHandler{make(chan string, 1)}
549
550         ks := RunFakeKeepServer(st)
551         defer ks.listener.Close()
552
553         arv, err := arvadosclient.MakeArvadosClient()
554         kc, _ := MakeKeepClient(&arv)
555         arv.ApiToken = "abc123"
556         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
557         kc.Retries = 0
558
559         r, n, url2, err := kc.Get(hash)
560         errNotFound, _ := err.(ErrNotFound)
561         c.Check(errNotFound, NotNil)
562         c.Check(n, Equals, int64(0))
563         c.Check(url2, Equals, "")
564         c.Check(r, Equals, nil)
565 }
566
567 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
568         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
569
570         st := &FailThenSucceedHandler{make(chan string, 1), 0,
571                 StubGetHandler{
572                         c,
573                         hash,
574                         "abc123",
575                         http.StatusOK,
576                         []byte("foo")}}
577
578         ks := RunFakeKeepServer(st)
579         defer ks.listener.Close()
580
581         arv, err := arvadosclient.MakeArvadosClient()
582         kc, _ := MakeKeepClient(&arv)
583         arv.ApiToken = "abc123"
584         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
585
586         r, n, url2, err := kc.Get(hash)
587         defer r.Close()
588         c.Check(err, Equals, nil)
589         c.Check(n, Equals, int64(3))
590         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
591
592         content, err2 := ioutil.ReadAll(r)
593         c.Check(err2, Equals, nil)
594         c.Check(content, DeepEquals, []byte("foo"))
595 }
596
597 func (s *StandaloneSuite) TestGetNetError(c *C) {
598         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
599
600         arv, err := arvadosclient.MakeArvadosClient()
601         kc, _ := MakeKeepClient(&arv)
602         arv.ApiToken = "abc123"
603         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
604
605         r, n, url2, err := kc.Get(hash)
606         errNotFound, _ := err.(ErrNotFound)
607         c.Check(errNotFound, NotNil)
608         c.Check(strings.Contains(err.Error(), "connection refused"), Equals, true)
609         c.Check(n, Equals, int64(0))
610         c.Check(url2, Equals, "")
611         c.Check(r, Equals, nil)
612 }
613
614 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
615         uuid := "zzzzz-bi6l4-123451234512345"
616         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
617
618         // This one shouldn't be used:
619         ks0 := RunFakeKeepServer(StubGetHandler{
620                 c,
621                 "error if used",
622                 "abc123",
623                 http.StatusOK,
624                 []byte("foo")})
625         defer ks0.listener.Close()
626         // This one should be used:
627         ks := RunFakeKeepServer(StubGetHandler{
628                 c,
629                 hash + "+K@" + uuid,
630                 "abc123",
631                 http.StatusOK,
632                 []byte("foo")})
633         defer ks.listener.Close()
634
635         arv, err := arvadosclient.MakeArvadosClient()
636         kc, _ := MakeKeepClient(&arv)
637         arv.ApiToken = "abc123"
638         kc.SetServiceRoots(
639                 map[string]string{"x": ks0.url},
640                 nil,
641                 map[string]string{uuid: ks.url})
642
643         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
644         defer r.Close()
645         c.Check(err, Equals, nil)
646         c.Check(n, Equals, int64(3))
647         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
648
649         content, err := ioutil.ReadAll(r)
650         c.Check(err, Equals, nil)
651         c.Check(content, DeepEquals, []byte("foo"))
652 }
653
654 // Use a service hint to fetch from a local disk service, overriding
655 // rendezvous probe order.
656 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
657         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
658         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
659
660         // This one shouldn't be used, although it appears first in
661         // rendezvous probe order:
662         ks0 := RunFakeKeepServer(StubGetHandler{
663                 c,
664                 "error if used",
665                 "abc123",
666                 http.StatusOK,
667                 []byte("foo")})
668         defer ks0.listener.Close()
669         // This one should be used:
670         ks := RunFakeKeepServer(StubGetHandler{
671                 c,
672                 hash + "+K@" + uuid,
673                 "abc123",
674                 http.StatusOK,
675                 []byte("foo")})
676         defer ks.listener.Close()
677
678         arv, err := arvadosclient.MakeArvadosClient()
679         kc, _ := MakeKeepClient(&arv)
680         arv.ApiToken = "abc123"
681         kc.SetServiceRoots(
682                 map[string]string{
683                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
684                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
685                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
686                         uuid: ks.url},
687                 nil,
688                 map[string]string{
689                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
690                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
691                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
692                         uuid: ks.url},
693         )
694
695         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
696         defer r.Close()
697         c.Check(err, Equals, nil)
698         c.Check(n, Equals, int64(3))
699         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
700
701         content, err := ioutil.ReadAll(r)
702         c.Check(err, Equals, nil)
703         c.Check(content, DeepEquals, []byte("foo"))
704 }
705
706 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
707         uuid := "zzzzz-bi6l4-123451234512345"
708         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
709
710         ksLocal := RunFakeKeepServer(StubGetHandler{
711                 c,
712                 hash + "+K@" + uuid,
713                 "abc123",
714                 http.StatusOK,
715                 []byte("foo")})
716         defer ksLocal.listener.Close()
717         ksGateway := RunFakeKeepServer(StubGetHandler{
718                 c,
719                 hash + "+K@" + uuid,
720                 "abc123",
721                 http.StatusInternalServerError,
722                 []byte("Error")})
723         defer ksGateway.listener.Close()
724
725         arv, err := arvadosclient.MakeArvadosClient()
726         kc, _ := MakeKeepClient(&arv)
727         arv.ApiToken = "abc123"
728         kc.SetServiceRoots(
729                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
730                 nil,
731                 map[string]string{uuid: ksGateway.url})
732
733         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
734         c.Assert(err, Equals, nil)
735         defer r.Close()
736         c.Check(n, Equals, int64(3))
737         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
738
739         content, err := ioutil.ReadAll(r)
740         c.Check(err, Equals, nil)
741         c.Check(content, DeepEquals, []byte("foo"))
742 }
743
744 type BarHandler struct {
745         handled chan string
746 }
747
748 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
749         resp.Write([]byte("bar"))
750         this.handled <- fmt.Sprintf("http://%s", req.Host)
751 }
752
753 func (s *StandaloneSuite) TestChecksum(c *C) {
754         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
755         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
756
757         st := BarHandler{make(chan string, 1)}
758
759         ks := RunFakeKeepServer(st)
760         defer ks.listener.Close()
761
762         arv, err := arvadosclient.MakeArvadosClient()
763         kc, _ := MakeKeepClient(&arv)
764         arv.ApiToken = "abc123"
765         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
766
767         r, n, _, err := kc.Get(barhash)
768         _, err = ioutil.ReadAll(r)
769         c.Check(n, Equals, int64(3))
770         c.Check(err, Equals, nil)
771
772         <-st.handled
773
774         r, n, _, err = kc.Get(foohash)
775         _, err = ioutil.ReadAll(r)
776         c.Check(n, Equals, int64(3))
777         c.Check(err, Equals, BadChecksum)
778
779         <-st.handled
780 }
781
782 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
783         content := []byte("waz")
784         hash := fmt.Sprintf("%x", md5.Sum(content))
785
786         fh := Error404Handler{
787                 make(chan string, 4)}
788
789         st := StubGetHandler{
790                 c,
791                 hash,
792                 "abc123",
793                 http.StatusOK,
794                 content}
795
796         arv, err := arvadosclient.MakeArvadosClient()
797         kc, _ := MakeKeepClient(&arv)
798         arv.ApiToken = "abc123"
799         localRoots := make(map[string]string)
800         writableLocalRoots := make(map[string]string)
801
802         ks1 := RunSomeFakeKeepServers(st, 1)
803         ks2 := RunSomeFakeKeepServers(fh, 4)
804
805         for i, k := range ks1 {
806                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
807                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
808                 defer k.listener.Close()
809         }
810         for i, k := range ks2 {
811                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
812                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
813                 defer k.listener.Close()
814         }
815
816         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
817         kc.Retries = 0
818
819         // This test works only if one of the failing services is
820         // attempted before the succeeding service. Otherwise,
821         // <-fh.handled below will just hang! (Probe order depends on
822         // the choice of block content "waz" and the UUIDs of the fake
823         // servers, so we just tried different strings until we found
824         // an example that passes this Assert.)
825         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
826
827         r, n, url2, err := kc.Get(hash)
828
829         <-fh.handled
830         c.Check(err, Equals, nil)
831         c.Check(n, Equals, int64(3))
832         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
833
834         read_content, err2 := ioutil.ReadAll(r)
835         c.Check(err2, Equals, nil)
836         c.Check(read_content, DeepEquals, content)
837 }
838
839 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
840         content := []byte("TestPutGetHead")
841
842         arv, err := arvadosclient.MakeArvadosClient()
843         kc, err := MakeKeepClient(&arv)
844         c.Assert(err, Equals, nil)
845
846         hash := fmt.Sprintf("%x", md5.Sum(content))
847
848         {
849                 n, _, err := kc.Ask(hash)
850                 c.Check(err, Equals, BlockNotFound)
851                 c.Check(n, Equals, int64(0))
852         }
853         {
854                 hash2, replicas, err := kc.PutB(content)
855                 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
856                 c.Check(replicas, Equals, 2)
857                 c.Check(err, Equals, nil)
858         }
859         {
860                 r, n, url2, err := kc.Get(hash)
861                 c.Check(err, Equals, nil)
862                 c.Check(n, Equals, int64(len(content)))
863                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
864
865                 read_content, err2 := ioutil.ReadAll(r)
866                 c.Check(err2, Equals, nil)
867                 c.Check(read_content, DeepEquals, content)
868         }
869         {
870                 n, url2, err := kc.Ask(hash)
871                 c.Check(err, Equals, nil)
872                 c.Check(n, Equals, int64(len(content)))
873                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
874         }
875 }
876
877 type StubProxyHandler struct {
878         handled chan string
879 }
880
881 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
882         resp.Header().Set("X-Keep-Replicas-Stored", "2")
883         this.handled <- fmt.Sprintf("http://%s", req.Host)
884 }
885
886 func (s *StandaloneSuite) TestPutProxy(c *C) {
887         log.Printf("TestPutProxy")
888
889         st := StubProxyHandler{make(chan string, 1)}
890
891         arv, err := arvadosclient.MakeArvadosClient()
892         kc, _ := MakeKeepClient(&arv)
893
894         kc.Want_replicas = 2
895         kc.Using_proxy = true
896         arv.ApiToken = "abc123"
897         localRoots := make(map[string]string)
898         writableLocalRoots := make(map[string]string)
899
900         ks1 := RunSomeFakeKeepServers(st, 1)
901
902         for i, k := range ks1 {
903                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
904                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
905                 defer k.listener.Close()
906         }
907
908         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
909
910         _, replicas, err := kc.PutB([]byte("foo"))
911         <-st.handled
912
913         c.Check(err, Equals, nil)
914         c.Check(replicas, Equals, 2)
915
916         log.Printf("TestPutProxy done")
917 }
918
919 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
920         log.Printf("TestPutProxy")
921
922         st := StubProxyHandler{make(chan string, 1)}
923
924         arv, err := arvadosclient.MakeArvadosClient()
925         kc, _ := MakeKeepClient(&arv)
926
927         kc.Want_replicas = 3
928         kc.Using_proxy = true
929         arv.ApiToken = "abc123"
930         localRoots := make(map[string]string)
931         writableLocalRoots := make(map[string]string)
932
933         ks1 := RunSomeFakeKeepServers(st, 1)
934
935         for i, k := range ks1 {
936                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
937                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
938                 defer k.listener.Close()
939         }
940         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
941
942         _, replicas, err := kc.PutB([]byte("foo"))
943         <-st.handled
944
945         c.Check(err, Equals, InsufficientReplicasError)
946         c.Check(replicas, Equals, 2)
947
948         log.Printf("TestPutProxy done")
949 }
950
951 func (s *StandaloneSuite) TestMakeLocator(c *C) {
952         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
953         c.Check(err, Equals, nil)
954         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
955         c.Check(l.Size, Equals, 3)
956         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
957 }
958
959 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
960         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
961         c.Check(err, Equals, nil)
962         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
963         c.Check(l.Size, Equals, -1)
964         c.Check(l.Hints, DeepEquals, []string{})
965 }
966
967 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
968         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
969         c.Check(err, Equals, nil)
970         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
971         c.Check(l.Size, Equals, -1)
972         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
973 }
974
975 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
976         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
977         l, err := MakeLocator(str)
978         c.Check(err, Equals, nil)
979         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
980         c.Check(l.Size, Equals, 3)
981         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
982         c.Check(l.String(), Equals, str)
983 }
984
985 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
986         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
987         c.Check(err, Equals, InvalidLocatorError)
988 }
989
990 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
991         hash := Md5String("foo")
992
993         st := StubPutHandler{
994                 c,
995                 hash,
996                 "abc123",
997                 "foo",
998                 make(chan string, 5)}
999
1000         arv, _ := arvadosclient.MakeArvadosClient()
1001         kc, _ := MakeKeepClient(&arv)
1002
1003         kc.Want_replicas = 2
1004         arv.ApiToken = "abc123"
1005         localRoots := make(map[string]string)
1006         writableLocalRoots := make(map[string]string)
1007
1008         ks := RunSomeFakeKeepServers(st, 5)
1009
1010         for i, k := range ks {
1011                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1012                 if i == 0 {
1013                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1014                 }
1015                 defer k.listener.Close()
1016         }
1017
1018         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1019
1020         _, replicas, err := kc.PutB([]byte("foo"))
1021
1022         c.Check(err, Equals, InsufficientReplicasError)
1023         c.Check(replicas, Equals, 1)
1024
1025         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1026 }
1027
1028 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1029         hash := Md5String("foo")
1030
1031         st := StubPutHandler{
1032                 c,
1033                 hash,
1034                 "abc123",
1035                 "foo",
1036                 make(chan string, 5)}
1037
1038         arv, _ := arvadosclient.MakeArvadosClient()
1039         kc, _ := MakeKeepClient(&arv)
1040
1041         kc.Want_replicas = 2
1042         arv.ApiToken = "abc123"
1043         localRoots := make(map[string]string)
1044         writableLocalRoots := make(map[string]string)
1045
1046         ks := RunSomeFakeKeepServers(st, 5)
1047
1048         for i, k := range ks {
1049                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1050                 defer k.listener.Close()
1051         }
1052
1053         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1054
1055         _, replicas, err := kc.PutB([]byte("foo"))
1056
1057         c.Check(err, Equals, InsufficientReplicasError)
1058         c.Check(replicas, Equals, 0)
1059 }
1060
1061 type StubGetIndexHandler struct {
1062         c              *C
1063         expectPath     string
1064         expectAPIToken string
1065         httpStatus     int
1066         body           []byte
1067 }
1068
1069 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1070         h.c.Check(req.URL.Path, Equals, h.expectPath)
1071         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1072         resp.WriteHeader(h.httpStatus)
1073         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1074         resp.Write(h.body)
1075 }
1076
1077 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1078         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1079
1080         st := StubGetIndexHandler{
1081                 c,
1082                 "/index",
1083                 "abc123",
1084                 http.StatusOK,
1085                 []byte(hash + "+3 1443559274\n\n")}
1086
1087         ks := RunFakeKeepServer(st)
1088         defer ks.listener.Close()
1089
1090         arv, err := arvadosclient.MakeArvadosClient()
1091         kc, _ := MakeKeepClient(&arv)
1092         arv.ApiToken = "abc123"
1093         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1094
1095         r, err := kc.GetIndex("x", "")
1096         c.Check(err, Equals, nil)
1097
1098         content, err2 := ioutil.ReadAll(r)
1099         c.Check(err2, Equals, nil)
1100         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1101 }
1102
1103 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1104         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1105
1106         st := StubGetIndexHandler{
1107                 c,
1108                 "/index/" + hash[0:3],
1109                 "abc123",
1110                 http.StatusOK,
1111                 []byte(hash + "+3 1443559274\n\n")}
1112
1113         ks := RunFakeKeepServer(st)
1114         defer ks.listener.Close()
1115
1116         arv, err := arvadosclient.MakeArvadosClient()
1117         kc, _ := MakeKeepClient(&arv)
1118         arv.ApiToken = "abc123"
1119         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1120
1121         r, err := kc.GetIndex("x", hash[0:3])
1122         c.Check(err, Equals, nil)
1123
1124         content, err2 := ioutil.ReadAll(r)
1125         c.Check(err2, Equals, nil)
1126         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1127 }
1128
1129 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1130         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1131
1132         st := StubGetIndexHandler{
1133                 c,
1134                 "/index/" + hash[0:3],
1135                 "abc123",
1136                 http.StatusOK,
1137                 []byte(hash)}
1138
1139         ks := RunFakeKeepServer(st)
1140         defer ks.listener.Close()
1141
1142         arv, err := arvadosclient.MakeArvadosClient()
1143         kc, _ := MakeKeepClient(&arv)
1144         arv.ApiToken = "abc123"
1145         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1146
1147         _, err = kc.GetIndex("x", hash[0:3])
1148         c.Check(err, Equals, ErrIncompleteIndex)
1149 }
1150
1151 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1152         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1153
1154         st := StubGetIndexHandler{
1155                 c,
1156                 "/index/" + hash[0:3],
1157                 "abc123",
1158                 http.StatusOK,
1159                 []byte(hash)}
1160
1161         ks := RunFakeKeepServer(st)
1162         defer ks.listener.Close()
1163
1164         arv, err := arvadosclient.MakeArvadosClient()
1165         kc, _ := MakeKeepClient(&arv)
1166         arv.ApiToken = "abc123"
1167         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1168
1169         _, err = kc.GetIndex("y", hash[0:3])
1170         c.Check(err, Equals, ErrNoSuchKeepServer)
1171 }
1172
1173 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1174         st := StubGetIndexHandler{
1175                 c,
1176                 "/index/abcd",
1177                 "abc123",
1178                 http.StatusOK,
1179                 []byte("\n")}
1180
1181         ks := RunFakeKeepServer(st)
1182         defer ks.listener.Close()
1183
1184         arv, err := arvadosclient.MakeArvadosClient()
1185         kc, _ := MakeKeepClient(&arv)
1186         arv.ApiToken = "abc123"
1187         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1188
1189         r, err := kc.GetIndex("x", "abcd")
1190         c.Check(err, Equals, nil)
1191
1192         content, err2 := ioutil.ReadAll(r)
1193         c.Check(err2, Equals, nil)
1194         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1195 }
1196
1197 type FailThenSucceedPutHandler struct {
1198         handled        chan string
1199         count          int
1200         successhandler StubPutHandler
1201 }
1202
1203 func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1204         if h.count == 0 {
1205                 resp.WriteHeader(500)
1206                 h.count += 1
1207                 h.handled <- fmt.Sprintf("http://%s", req.Host)
1208         } else {
1209                 h.successhandler.ServeHTTP(resp, req)
1210         }
1211 }
1212
1213 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1214         st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
1215                 StubPutHandler{
1216                         c,
1217                         Md5String("foo"),
1218                         "abc123",
1219                         "foo",
1220                         make(chan string, 5)}}
1221
1222         arv, _ := arvadosclient.MakeArvadosClient()
1223         kc, _ := MakeKeepClient(&arv)
1224
1225         kc.Want_replicas = 2
1226         arv.ApiToken = "abc123"
1227         localRoots := make(map[string]string)
1228         writableLocalRoots := make(map[string]string)
1229
1230         ks := RunSomeFakeKeepServers(st, 2)
1231
1232         for i, k := range ks {
1233                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1234                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1235                 defer k.listener.Close()
1236         }
1237
1238         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1239
1240         hash, replicas, err := kc.PutB([]byte("foo"))
1241
1242         c.Check(err, Equals, nil)
1243         c.Check(hash, Equals, "")
1244         c.Check(replicas, Equals, 2)
1245 }