5824: Turn off debug printfs unless enabled by calling program.
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "crypto/md5"
5         "flag"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9         "git.curoverse.com/arvados.git/sdk/go/streamer"
10         . "gopkg.in/check.v1"
11         "io"
12         "io/ioutil"
13         "log"
14         "net"
15         "net/http"
16         "os"
17         "strings"
18         "testing"
19 )
20
21 // Gocheck boilerplate
22 func Test(t *testing.T) {
23         TestingT(t)
24 }
25
26 // Gocheck boilerplate
27 var _ = Suite(&ServerRequiredSuite{})
28 var _ = Suite(&StandaloneSuite{})
29
30 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
31
32 // Tests that require the Keep server running
33 type ServerRequiredSuite struct{}
34
35 // Standalone tests
36 type StandaloneSuite struct{}
37
38 func pythonDir() string {
39         cwd, _ := os.Getwd()
40         return fmt.Sprintf("%s/../../python/tests", cwd)
41 }
42
43 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
44         if *no_server {
45                 c.Skip("Skipping tests that require server")
46                 return
47         }
48         arvadostest.StartAPI()
49         arvadostest.StartKeep(2, false)
50 }
51
52 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
53         if *no_server {
54                 return
55         }
56         arvadostest.StopKeep(2)
57         arvadostest.StopAPI()
58 }
59
60 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
61         arv, err := arvadosclient.MakeArvadosClient()
62         c.Assert(err, Equals, nil)
63
64         kc, err := MakeKeepClient(&arv)
65
66         c.Assert(err, Equals, nil)
67         c.Check(len(kc.LocalRoots()), Equals, 2)
68         for _, root := range kc.LocalRoots() {
69                 c.Check(root, Matches, "http://localhost:\\d+")
70         }
71 }
72
73 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
74         arv, err := arvadosclient.MakeArvadosClient()
75         c.Assert(err, Equals, nil)
76
77         kc, err := MakeKeepClient(&arv)
78         c.Assert(kc.Want_replicas, Equals, 2)
79
80         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
81         kc, err = MakeKeepClient(&arv)
82         c.Assert(kc.Want_replicas, Equals, 3)
83
84         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
85         kc, err = MakeKeepClient(&arv)
86         c.Assert(kc.Want_replicas, Equals, 1)
87 }
88
89 type StubPutHandler struct {
90         c              *C
91         expectPath     string
92         expectApiToken string
93         expectBody     string
94         handled        chan string
95 }
96
97 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
98         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
99         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
100         body, err := ioutil.ReadAll(req.Body)
101         sph.c.Check(err, Equals, nil)
102         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
103         resp.WriteHeader(200)
104         sph.handled <- fmt.Sprintf("http://%s", req.Host)
105 }
106
107 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
108         var err error
109         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
110         if err != nil {
111                 panic(fmt.Sprintf("Could not listen on any port"))
112         }
113         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
114         go http.Serve(ks.listener, st)
115         return
116 }
117
118 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
119         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
120
121         ks := RunFakeKeepServer(st)
122         defer ks.listener.Close()
123
124         arv, _ := arvadosclient.MakeArvadosClient()
125         arv.ApiToken = "abc123"
126
127         kc, _ := MakeKeepClient(&arv)
128
129         reader, writer := io.Pipe()
130         upload_status := make(chan uploadStatus)
131
132         f(kc, ks.url, reader, writer, upload_status)
133 }
134
135 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
136         log.Printf("TestUploadToStubKeepServer")
137
138         st := StubPutHandler{
139                 c,
140                 "acbd18db4cc2f85cedef654fccc4a4d8",
141                 "abc123",
142                 "foo",
143                 make(chan string)}
144
145         UploadToStubHelper(c, st,
146                 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) {
147
148                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), 0)
149
150                         writer.Write([]byte("foo"))
151                         writer.Close()
152
153                         <-st.handled
154                         status := <-upload_status
155                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
156                 })
157
158         log.Printf("TestUploadToStubKeepServer done")
159 }
160
161 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
162         log.Printf("TestUploadToStubKeepServerBufferReader")
163
164         st := StubPutHandler{
165                 c,
166                 "acbd18db4cc2f85cedef654fccc4a4d8",
167                 "abc123",
168                 "foo",
169                 make(chan string)}
170
171         UploadToStubHelper(c, st,
172                 func(kc *KeepClient, url string, reader io.ReadCloser,
173                         writer io.WriteCloser, upload_status chan uploadStatus) {
174
175                         tr := streamer.AsyncStreamFromReader(512, reader)
176                         defer tr.Close()
177
178                         br1 := tr.MakeStreamReader()
179
180                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
181
182                         writer.Write([]byte("foo"))
183                         writer.Close()
184
185                         <-st.handled
186
187                         status := <-upload_status
188                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
189                 })
190
191         log.Printf("TestUploadToStubKeepServerBufferReader done")
192 }
193
194 type FailHandler struct {
195         handled chan string
196 }
197
198 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
199         resp.WriteHeader(500)
200         fh.handled <- fmt.Sprintf("http://%s", req.Host)
201 }
202
203 type FailThenSucceedHandler struct {
204         handled        chan string
205         count          int
206         successhandler StubGetHandler
207 }
208
209 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
210         if fh.count == 0 {
211                 resp.WriteHeader(500)
212                 fh.count += 1
213                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
214         } else {
215                 fh.successhandler.ServeHTTP(resp, req)
216         }
217 }
218
219 type Error404Handler struct {
220         handled chan string
221 }
222
223 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
224         resp.WriteHeader(404)
225         fh.handled <- fmt.Sprintf("http://%s", req.Host)
226 }
227
228 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
229         log.Printf("TestFailedUploadToStubKeepServer")
230
231         st := FailHandler{
232                 make(chan string)}
233
234         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
235
236         UploadToStubHelper(c, st,
237                 func(kc *KeepClient, url string, reader io.ReadCloser,
238                         writer io.WriteCloser, upload_status chan uploadStatus) {
239
240                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, 0)
241
242                         writer.Write([]byte("foo"))
243                         writer.Close()
244
245                         <-st.handled
246
247                         status := <-upload_status
248                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
249                         c.Check(status.statusCode, Equals, 500)
250                 })
251         log.Printf("TestFailedUploadToStubKeepServer done")
252 }
253
254 type KeepServer struct {
255         listener net.Listener
256         url      string
257 }
258
259 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
260         ks = make([]KeepServer, n)
261
262         for i := 0; i < n; i += 1 {
263                 ks[i] = RunFakeKeepServer(st)
264         }
265
266         return ks
267 }
268
269 func (s *StandaloneSuite) TestPutB(c *C) {
270         log.Printf("TestPutB")
271
272         hash := Md5String("foo")
273
274         st := StubPutHandler{
275                 c,
276                 hash,
277                 "abc123",
278                 "foo",
279                 make(chan string, 5)}
280
281         arv, _ := arvadosclient.MakeArvadosClient()
282         kc, _ := MakeKeepClient(&arv)
283
284         kc.Want_replicas = 2
285         arv.ApiToken = "abc123"
286         localRoots := make(map[string]string)
287         writableLocalRoots := make(map[string]string)
288
289         ks := RunSomeFakeKeepServers(st, 5)
290
291         for i, k := range ks {
292                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
293                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
294                 defer k.listener.Close()
295         }
296
297         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
298
299         kc.PutB([]byte("foo"))
300
301         shuff := NewRootSorter(
302                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
303
304         s1 := <-st.handled
305         s2 := <-st.handled
306         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
307                 (s1 == shuff[1] && s2 == shuff[0]),
308                 Equals,
309                 true)
310
311         log.Printf("TestPutB done")
312 }
313
314 func (s *StandaloneSuite) TestPutHR(c *C) {
315         log.Printf("TestPutHR")
316
317         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
318
319         st := StubPutHandler{
320                 c,
321                 hash,
322                 "abc123",
323                 "foo",
324                 make(chan string, 5)}
325
326         arv, _ := arvadosclient.MakeArvadosClient()
327         kc, _ := MakeKeepClient(&arv)
328
329         kc.Want_replicas = 2
330         arv.ApiToken = "abc123"
331         localRoots := make(map[string]string)
332         writableLocalRoots := make(map[string]string)
333
334         ks := RunSomeFakeKeepServers(st, 5)
335
336         for i, k := range ks {
337                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
338                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
339                 defer k.listener.Close()
340         }
341
342         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
343
344         reader, writer := io.Pipe()
345
346         go func() {
347                 writer.Write([]byte("foo"))
348                 writer.Close()
349         }()
350
351         kc.PutHR(hash, reader, 3)
352
353         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
354         log.Print(shuff)
355
356         s1 := <-st.handled
357         s2 := <-st.handled
358
359         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
360                 (s1 == shuff[1] && s2 == shuff[0]),
361                 Equals,
362                 true)
363
364         log.Printf("TestPutHR done")
365 }
366
367 func (s *StandaloneSuite) TestPutWithFail(c *C) {
368         log.Printf("TestPutWithFail")
369
370         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
371
372         st := StubPutHandler{
373                 c,
374                 hash,
375                 "abc123",
376                 "foo",
377                 make(chan string, 4)}
378
379         fh := FailHandler{
380                 make(chan string, 1)}
381
382         arv, err := arvadosclient.MakeArvadosClient()
383         kc, _ := MakeKeepClient(&arv)
384
385         kc.Want_replicas = 2
386         arv.ApiToken = "abc123"
387         localRoots := make(map[string]string)
388         writableLocalRoots := make(map[string]string)
389
390         ks1 := RunSomeFakeKeepServers(st, 4)
391         ks2 := RunSomeFakeKeepServers(fh, 1)
392
393         for i, k := range ks1 {
394                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
395                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
396                 defer k.listener.Close()
397         }
398         for i, k := range ks2 {
399                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
400                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
401                 defer k.listener.Close()
402         }
403
404         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
405
406         shuff := NewRootSorter(
407                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
408
409         phash, replicas, err := kc.PutB([]byte("foo"))
410
411         <-fh.handled
412
413         c.Check(err, Equals, nil)
414         c.Check(phash, Equals, "")
415         c.Check(replicas, Equals, 2)
416
417         s1 := <-st.handled
418         s2 := <-st.handled
419
420         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
421                 (s1 == shuff[2] && s2 == shuff[1]),
422                 Equals,
423                 true)
424 }
425
426 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
427         log.Printf("TestPutWithTooManyFail")
428
429         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
430
431         st := StubPutHandler{
432                 c,
433                 hash,
434                 "abc123",
435                 "foo",
436                 make(chan string, 1)}
437
438         fh := FailHandler{
439                 make(chan string, 4)}
440
441         arv, err := arvadosclient.MakeArvadosClient()
442         kc, _ := MakeKeepClient(&arv)
443
444         kc.Want_replicas = 2
445         kc.Retries = 0
446         arv.ApiToken = "abc123"
447         localRoots := make(map[string]string)
448         writableLocalRoots := make(map[string]string)
449
450         ks1 := RunSomeFakeKeepServers(st, 1)
451         ks2 := RunSomeFakeKeepServers(fh, 4)
452
453         for i, k := range ks1 {
454                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
455                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
456                 defer k.listener.Close()
457         }
458         for i, k := range ks2 {
459                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
460                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
461                 defer k.listener.Close()
462         }
463
464         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
465
466         _, replicas, err := kc.PutB([]byte("foo"))
467
468         c.Check(err, Equals, InsufficientReplicasError)
469         c.Check(replicas, Equals, 1)
470         c.Check(<-st.handled, Equals, ks1[0].url)
471
472         log.Printf("TestPutWithTooManyFail done")
473 }
474
475 type StubGetHandler struct {
476         c              *C
477         expectPath     string
478         expectApiToken string
479         httpStatus     int
480         body           []byte
481 }
482
483 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
484         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
485         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
486         resp.WriteHeader(sgh.httpStatus)
487         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
488         resp.Write(sgh.body)
489 }
490
491 func (s *StandaloneSuite) TestGet(c *C) {
492         log.Printf("TestGet")
493
494         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
495
496         st := StubGetHandler{
497                 c,
498                 hash,
499                 "abc123",
500                 http.StatusOK,
501                 []byte("foo")}
502
503         ks := RunFakeKeepServer(st)
504         defer ks.listener.Close()
505
506         arv, err := arvadosclient.MakeArvadosClient()
507         kc, _ := MakeKeepClient(&arv)
508         arv.ApiToken = "abc123"
509         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
510
511         r, n, url2, err := kc.Get(hash)
512         defer r.Close()
513         c.Check(err, Equals, nil)
514         c.Check(n, Equals, int64(3))
515         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
516
517         content, err2 := ioutil.ReadAll(r)
518         c.Check(err2, Equals, nil)
519         c.Check(content, DeepEquals, []byte("foo"))
520
521         log.Printf("TestGet done")
522 }
523
524 func (s *StandaloneSuite) TestGet404(c *C) {
525         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
526
527         st := Error404Handler{make(chan string, 1)}
528
529         ks := RunFakeKeepServer(st)
530         defer ks.listener.Close()
531
532         arv, err := arvadosclient.MakeArvadosClient()
533         kc, _ := MakeKeepClient(&arv)
534         arv.ApiToken = "abc123"
535         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
536
537         r, n, url2, err := kc.Get(hash)
538         c.Check(err, Equals, BlockNotFound)
539         c.Check(n, Equals, int64(0))
540         c.Check(url2, Equals, "")
541         c.Check(r, Equals, nil)
542 }
543
544 func (s *StandaloneSuite) TestGetFail(c *C) {
545         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
546
547         st := FailHandler{make(chan string, 1)}
548
549         ks := RunFakeKeepServer(st)
550         defer ks.listener.Close()
551
552         arv, err := arvadosclient.MakeArvadosClient()
553         kc, _ := MakeKeepClient(&arv)
554         arv.ApiToken = "abc123"
555         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
556         kc.Retries = 0
557
558         r, n, url2, err := kc.Get(hash)
559         errNotFound, _ := err.(*ErrNotFound)
560         c.Check(errNotFound, NotNil)
561         c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
562         c.Check(errNotFound.Temporary(), Equals, true)
563         c.Check(n, Equals, int64(0))
564         c.Check(url2, Equals, "")
565         c.Check(r, Equals, nil)
566 }
567
568 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
569         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
570
571         st := &FailThenSucceedHandler{make(chan string, 1), 0,
572                 StubGetHandler{
573                         c,
574                         hash,
575                         "abc123",
576                         http.StatusOK,
577                         []byte("foo")}}
578
579         ks := RunFakeKeepServer(st)
580         defer ks.listener.Close()
581
582         arv, err := arvadosclient.MakeArvadosClient()
583         kc, _ := MakeKeepClient(&arv)
584         arv.ApiToken = "abc123"
585         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
586
587         r, n, url2, err := kc.Get(hash)
588         defer r.Close()
589         c.Check(err, Equals, nil)
590         c.Check(n, Equals, int64(3))
591         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
592
593         content, err2 := ioutil.ReadAll(r)
594         c.Check(err2, Equals, nil)
595         c.Check(content, DeepEquals, []byte("foo"))
596 }
597
598 func (s *StandaloneSuite) TestGetNetError(c *C) {
599         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
600
601         arv, err := arvadosclient.MakeArvadosClient()
602         kc, _ := MakeKeepClient(&arv)
603         arv.ApiToken = "abc123"
604         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
605
606         r, n, url2, err := kc.Get(hash)
607         errNotFound, _ := err.(*ErrNotFound)
608         c.Check(errNotFound, NotNil)
609         c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
610         c.Check(errNotFound.Temporary(), Equals, true)
611         c.Check(n, Equals, int64(0))
612         c.Check(url2, Equals, "")
613         c.Check(r, Equals, nil)
614 }
615
616 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
617         uuid := "zzzzz-bi6l4-123451234512345"
618         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
619
620         // This one shouldn't be used:
621         ks0 := RunFakeKeepServer(StubGetHandler{
622                 c,
623                 "error if used",
624                 "abc123",
625                 http.StatusOK,
626                 []byte("foo")})
627         defer ks0.listener.Close()
628         // This one should be used:
629         ks := RunFakeKeepServer(StubGetHandler{
630                 c,
631                 hash + "+K@" + uuid,
632                 "abc123",
633                 http.StatusOK,
634                 []byte("foo")})
635         defer ks.listener.Close()
636
637         arv, err := arvadosclient.MakeArvadosClient()
638         kc, _ := MakeKeepClient(&arv)
639         arv.ApiToken = "abc123"
640         kc.SetServiceRoots(
641                 map[string]string{"x": ks0.url},
642                 nil,
643                 map[string]string{uuid: ks.url})
644
645         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
646         defer r.Close()
647         c.Check(err, Equals, nil)
648         c.Check(n, Equals, int64(3))
649         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
650
651         content, err := ioutil.ReadAll(r)
652         c.Check(err, Equals, nil)
653         c.Check(content, DeepEquals, []byte("foo"))
654 }
655
656 // Use a service hint to fetch from a local disk service, overriding
657 // rendezvous probe order.
658 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
659         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
660         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
661
662         // This one shouldn't be used, although it appears first in
663         // rendezvous probe order:
664         ks0 := RunFakeKeepServer(StubGetHandler{
665                 c,
666                 "error if used",
667                 "abc123",
668                 http.StatusOK,
669                 []byte("foo")})
670         defer ks0.listener.Close()
671         // This one should be used:
672         ks := RunFakeKeepServer(StubGetHandler{
673                 c,
674                 hash + "+K@" + uuid,
675                 "abc123",
676                 http.StatusOK,
677                 []byte("foo")})
678         defer ks.listener.Close()
679
680         arv, err := arvadosclient.MakeArvadosClient()
681         kc, _ := MakeKeepClient(&arv)
682         arv.ApiToken = "abc123"
683         kc.SetServiceRoots(
684                 map[string]string{
685                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
686                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
687                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
688                         uuid: ks.url},
689                 nil,
690                 map[string]string{
691                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
692                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
693                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
694                         uuid: ks.url},
695         )
696
697         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
698         defer r.Close()
699         c.Check(err, Equals, nil)
700         c.Check(n, Equals, int64(3))
701         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
702
703         content, err := ioutil.ReadAll(r)
704         c.Check(err, Equals, nil)
705         c.Check(content, DeepEquals, []byte("foo"))
706 }
707
708 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
709         uuid := "zzzzz-bi6l4-123451234512345"
710         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
711
712         ksLocal := RunFakeKeepServer(StubGetHandler{
713                 c,
714                 hash + "+K@" + uuid,
715                 "abc123",
716                 http.StatusOK,
717                 []byte("foo")})
718         defer ksLocal.listener.Close()
719         ksGateway := RunFakeKeepServer(StubGetHandler{
720                 c,
721                 hash + "+K@" + uuid,
722                 "abc123",
723                 http.StatusInternalServerError,
724                 []byte("Error")})
725         defer ksGateway.listener.Close()
726
727         arv, err := arvadosclient.MakeArvadosClient()
728         kc, _ := MakeKeepClient(&arv)
729         arv.ApiToken = "abc123"
730         kc.SetServiceRoots(
731                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
732                 nil,
733                 map[string]string{uuid: ksGateway.url})
734
735         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
736         c.Assert(err, Equals, nil)
737         defer r.Close()
738         c.Check(n, Equals, int64(3))
739         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
740
741         content, err := ioutil.ReadAll(r)
742         c.Check(err, Equals, nil)
743         c.Check(content, DeepEquals, []byte("foo"))
744 }
745
746 type BarHandler struct {
747         handled chan string
748 }
749
750 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
751         resp.Write([]byte("bar"))
752         this.handled <- fmt.Sprintf("http://%s", req.Host)
753 }
754
755 func (s *StandaloneSuite) TestChecksum(c *C) {
756         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
757         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
758
759         st := BarHandler{make(chan string, 1)}
760
761         ks := RunFakeKeepServer(st)
762         defer ks.listener.Close()
763
764         arv, err := arvadosclient.MakeArvadosClient()
765         kc, _ := MakeKeepClient(&arv)
766         arv.ApiToken = "abc123"
767         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
768
769         r, n, _, err := kc.Get(barhash)
770         _, err = ioutil.ReadAll(r)
771         c.Check(n, Equals, int64(3))
772         c.Check(err, Equals, nil)
773
774         <-st.handled
775
776         r, n, _, err = kc.Get(foohash)
777         _, err = ioutil.ReadAll(r)
778         c.Check(n, Equals, int64(3))
779         c.Check(err, Equals, BadChecksum)
780
781         <-st.handled
782 }
783
784 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
785         content := []byte("waz")
786         hash := fmt.Sprintf("%x", md5.Sum(content))
787
788         fh := Error404Handler{
789                 make(chan string, 4)}
790
791         st := StubGetHandler{
792                 c,
793                 hash,
794                 "abc123",
795                 http.StatusOK,
796                 content}
797
798         arv, err := arvadosclient.MakeArvadosClient()
799         kc, _ := MakeKeepClient(&arv)
800         arv.ApiToken = "abc123"
801         localRoots := make(map[string]string)
802         writableLocalRoots := make(map[string]string)
803
804         ks1 := RunSomeFakeKeepServers(st, 1)
805         ks2 := RunSomeFakeKeepServers(fh, 4)
806
807         for i, k := range ks1 {
808                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
809                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
810                 defer k.listener.Close()
811         }
812         for i, k := range ks2 {
813                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
814                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
815                 defer k.listener.Close()
816         }
817
818         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
819         kc.Retries = 0
820
821         // This test works only if one of the failing services is
822         // attempted before the succeeding service. Otherwise,
823         // <-fh.handled below will just hang! (Probe order depends on
824         // the choice of block content "waz" and the UUIDs of the fake
825         // servers, so we just tried different strings until we found
826         // an example that passes this Assert.)
827         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
828
829         r, n, url2, err := kc.Get(hash)
830
831         <-fh.handled
832         c.Check(err, Equals, nil)
833         c.Check(n, Equals, int64(3))
834         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
835
836         read_content, err2 := ioutil.ReadAll(r)
837         c.Check(err2, Equals, nil)
838         c.Check(read_content, DeepEquals, content)
839 }
840
841 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
842         content := []byte("TestPutGetHead")
843
844         arv, err := arvadosclient.MakeArvadosClient()
845         kc, err := MakeKeepClient(&arv)
846         c.Assert(err, Equals, nil)
847
848         hash := fmt.Sprintf("%x", md5.Sum(content))
849
850         {
851                 n, _, err := kc.Ask(hash)
852                 c.Check(err, Equals, BlockNotFound)
853                 c.Check(n, Equals, int64(0))
854         }
855         {
856                 hash2, replicas, err := kc.PutB(content)
857                 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
858                 c.Check(replicas, Equals, 2)
859                 c.Check(err, Equals, nil)
860         }
861         {
862                 r, n, url2, err := kc.Get(hash)
863                 c.Check(err, Equals, nil)
864                 c.Check(n, Equals, int64(len(content)))
865                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
866
867                 read_content, err2 := ioutil.ReadAll(r)
868                 c.Check(err2, Equals, nil)
869                 c.Check(read_content, DeepEquals, content)
870         }
871         {
872                 n, url2, err := kc.Ask(hash)
873                 c.Check(err, Equals, nil)
874                 c.Check(n, Equals, int64(len(content)))
875                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
876         }
877 }
878
879 type StubProxyHandler struct {
880         handled chan string
881 }
882
883 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
884         resp.Header().Set("X-Keep-Replicas-Stored", "2")
885         this.handled <- fmt.Sprintf("http://%s", req.Host)
886 }
887
888 func (s *StandaloneSuite) TestPutProxy(c *C) {
889         log.Printf("TestPutProxy")
890
891         st := StubProxyHandler{make(chan string, 1)}
892
893         arv, err := arvadosclient.MakeArvadosClient()
894         kc, _ := MakeKeepClient(&arv)
895
896         kc.Want_replicas = 2
897         kc.Using_proxy = true
898         arv.ApiToken = "abc123"
899         localRoots := make(map[string]string)
900         writableLocalRoots := make(map[string]string)
901
902         ks1 := RunSomeFakeKeepServers(st, 1)
903
904         for i, k := range ks1 {
905                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
906                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
907                 defer k.listener.Close()
908         }
909
910         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
911
912         _, replicas, err := kc.PutB([]byte("foo"))
913         <-st.handled
914
915         c.Check(err, Equals, nil)
916         c.Check(replicas, Equals, 2)
917
918         log.Printf("TestPutProxy done")
919 }
920
921 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
922         log.Printf("TestPutProxy")
923
924         st := StubProxyHandler{make(chan string, 1)}
925
926         arv, err := arvadosclient.MakeArvadosClient()
927         kc, _ := MakeKeepClient(&arv)
928
929         kc.Want_replicas = 3
930         kc.Using_proxy = true
931         arv.ApiToken = "abc123"
932         localRoots := make(map[string]string)
933         writableLocalRoots := make(map[string]string)
934
935         ks1 := RunSomeFakeKeepServers(st, 1)
936
937         for i, k := range ks1 {
938                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
939                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
940                 defer k.listener.Close()
941         }
942         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
943
944         _, replicas, err := kc.PutB([]byte("foo"))
945         <-st.handled
946
947         c.Check(err, Equals, InsufficientReplicasError)
948         c.Check(replicas, Equals, 2)
949
950         log.Printf("TestPutProxy done")
951 }
952
953 func (s *StandaloneSuite) TestMakeLocator(c *C) {
954         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
955         c.Check(err, Equals, nil)
956         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
957         c.Check(l.Size, Equals, 3)
958         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
959 }
960
961 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
962         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
963         c.Check(err, Equals, nil)
964         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
965         c.Check(l.Size, Equals, -1)
966         c.Check(l.Hints, DeepEquals, []string{})
967 }
968
969 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
970         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
971         c.Check(err, Equals, nil)
972         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
973         c.Check(l.Size, Equals, -1)
974         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
975 }
976
977 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
978         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
979         l, err := MakeLocator(str)
980         c.Check(err, Equals, nil)
981         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
982         c.Check(l.Size, Equals, 3)
983         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
984         c.Check(l.String(), Equals, str)
985 }
986
987 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
988         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
989         c.Check(err, Equals, InvalidLocatorError)
990 }
991
992 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
993         hash := Md5String("foo")
994
995         st := StubPutHandler{
996                 c,
997                 hash,
998                 "abc123",
999                 "foo",
1000                 make(chan string, 5)}
1001
1002         arv, _ := arvadosclient.MakeArvadosClient()
1003         kc, _ := MakeKeepClient(&arv)
1004
1005         kc.Want_replicas = 2
1006         arv.ApiToken = "abc123"
1007         localRoots := make(map[string]string)
1008         writableLocalRoots := make(map[string]string)
1009
1010         ks := RunSomeFakeKeepServers(st, 5)
1011
1012         for i, k := range ks {
1013                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1014                 if i == 0 {
1015                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1016                 }
1017                 defer k.listener.Close()
1018         }
1019
1020         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1021
1022         _, replicas, err := kc.PutB([]byte("foo"))
1023
1024         c.Check(err, Equals, InsufficientReplicasError)
1025         c.Check(replicas, Equals, 1)
1026
1027         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1028 }
1029
1030 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1031         hash := Md5String("foo")
1032
1033         st := StubPutHandler{
1034                 c,
1035                 hash,
1036                 "abc123",
1037                 "foo",
1038                 make(chan string, 5)}
1039
1040         arv, _ := arvadosclient.MakeArvadosClient()
1041         kc, _ := MakeKeepClient(&arv)
1042
1043         kc.Want_replicas = 2
1044         arv.ApiToken = "abc123"
1045         localRoots := make(map[string]string)
1046         writableLocalRoots := make(map[string]string)
1047
1048         ks := RunSomeFakeKeepServers(st, 5)
1049
1050         for i, k := range ks {
1051                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1052                 defer k.listener.Close()
1053         }
1054
1055         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1056
1057         _, replicas, err := kc.PutB([]byte("foo"))
1058
1059         c.Check(err, Equals, InsufficientReplicasError)
1060         c.Check(replicas, Equals, 0)
1061 }
1062
1063 type StubGetIndexHandler struct {
1064         c              *C
1065         expectPath     string
1066         expectAPIToken string
1067         httpStatus     int
1068         body           []byte
1069 }
1070
1071 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1072         h.c.Check(req.URL.Path, Equals, h.expectPath)
1073         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1074         resp.WriteHeader(h.httpStatus)
1075         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1076         resp.Write(h.body)
1077 }
1078
1079 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1080         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1081
1082         st := StubGetIndexHandler{
1083                 c,
1084                 "/index",
1085                 "abc123",
1086                 http.StatusOK,
1087                 []byte(hash + "+3 1443559274\n\n")}
1088
1089         ks := RunFakeKeepServer(st)
1090         defer ks.listener.Close()
1091
1092         arv, err := arvadosclient.MakeArvadosClient()
1093         kc, _ := MakeKeepClient(&arv)
1094         arv.ApiToken = "abc123"
1095         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1096
1097         r, err := kc.GetIndex("x", "")
1098         c.Check(err, Equals, nil)
1099
1100         content, err2 := ioutil.ReadAll(r)
1101         c.Check(err2, Equals, nil)
1102         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1103 }
1104
1105 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1106         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1107
1108         st := StubGetIndexHandler{
1109                 c,
1110                 "/index/" + hash[0:3],
1111                 "abc123",
1112                 http.StatusOK,
1113                 []byte(hash + "+3 1443559274\n\n")}
1114
1115         ks := RunFakeKeepServer(st)
1116         defer ks.listener.Close()
1117
1118         arv, err := arvadosclient.MakeArvadosClient()
1119         kc, _ := MakeKeepClient(&arv)
1120         arv.ApiToken = "abc123"
1121         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1122
1123         r, err := kc.GetIndex("x", hash[0:3])
1124         c.Check(err, Equals, nil)
1125
1126         content, err2 := ioutil.ReadAll(r)
1127         c.Check(err2, Equals, nil)
1128         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1129 }
1130
1131 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1132         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1133
1134         st := StubGetIndexHandler{
1135                 c,
1136                 "/index/" + hash[0:3],
1137                 "abc123",
1138                 http.StatusOK,
1139                 []byte(hash)}
1140
1141         ks := RunFakeKeepServer(st)
1142         defer ks.listener.Close()
1143
1144         arv, err := arvadosclient.MakeArvadosClient()
1145         kc, _ := MakeKeepClient(&arv)
1146         arv.ApiToken = "abc123"
1147         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1148
1149         _, err = kc.GetIndex("x", hash[0:3])
1150         c.Check(err, Equals, ErrIncompleteIndex)
1151 }
1152
1153 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1154         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1155
1156         st := StubGetIndexHandler{
1157                 c,
1158                 "/index/" + hash[0:3],
1159                 "abc123",
1160                 http.StatusOK,
1161                 []byte(hash)}
1162
1163         ks := RunFakeKeepServer(st)
1164         defer ks.listener.Close()
1165
1166         arv, err := arvadosclient.MakeArvadosClient()
1167         kc, _ := MakeKeepClient(&arv)
1168         arv.ApiToken = "abc123"
1169         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1170
1171         _, err = kc.GetIndex("y", hash[0:3])
1172         c.Check(err, Equals, ErrNoSuchKeepServer)
1173 }
1174
1175 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1176         st := StubGetIndexHandler{
1177                 c,
1178                 "/index/abcd",
1179                 "abc123",
1180                 http.StatusOK,
1181                 []byte("\n")}
1182
1183         ks := RunFakeKeepServer(st)
1184         defer ks.listener.Close()
1185
1186         arv, err := arvadosclient.MakeArvadosClient()
1187         kc, _ := MakeKeepClient(&arv)
1188         arv.ApiToken = "abc123"
1189         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1190
1191         r, err := kc.GetIndex("x", "abcd")
1192         c.Check(err, Equals, nil)
1193
1194         content, err2 := ioutil.ReadAll(r)
1195         c.Check(err2, Equals, nil)
1196         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1197 }
1198
1199 type FailThenSucceedPutHandler struct {
1200         handled        chan string
1201         count          int
1202         successhandler StubPutHandler
1203 }
1204
1205 func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1206         if h.count == 0 {
1207                 resp.WriteHeader(500)
1208                 h.count += 1
1209                 h.handled <- fmt.Sprintf("http://%s", req.Host)
1210         } else {
1211                 h.successhandler.ServeHTTP(resp, req)
1212         }
1213 }
1214
1215 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1216         st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
1217                 StubPutHandler{
1218                         c,
1219                         Md5String("foo"),
1220                         "abc123",
1221                         "foo",
1222                         make(chan string, 5)}}
1223
1224         arv, _ := arvadosclient.MakeArvadosClient()
1225         kc, _ := MakeKeepClient(&arv)
1226
1227         kc.Want_replicas = 2
1228         arv.ApiToken = "abc123"
1229         localRoots := make(map[string]string)
1230         writableLocalRoots := make(map[string]string)
1231
1232         ks := RunSomeFakeKeepServers(st, 2)
1233
1234         for i, k := range ks {
1235                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1236                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1237                 defer k.listener.Close()
1238         }
1239
1240         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1241
1242         hash, replicas, err := kc.PutB([]byte("foo"))
1243
1244         c.Check(err, Equals, nil)
1245         c.Check(hash, Equals, "")
1246         c.Check(replicas, Equals, 2)
1247 }