c52e07b8f6ea3b6ea417f9843049e3d99b986fa5
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "log"
16         "net"
17         "net/http"
18         "os"
19         "strings"
20         "sync"
21         "testing"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
26         "git.arvados.org/arvados.git/sdk/go/arvadostest"
27         . "gopkg.in/check.v1"
28         check "gopkg.in/check.v1"
29 )
30
31 // Gocheck boilerplate
32 func Test(t *testing.T) {
33         TestingT(t)
34 }
35
36 // Gocheck boilerplate
37 var _ = Suite(&ServerRequiredSuite{})
38 var _ = Suite(&StandaloneSuite{})
39
40 // Tests that require the Keep server running
41 type ServerRequiredSuite struct{}
42
43 // Standalone tests
44 type StandaloneSuite struct{}
45
46 func (s *StandaloneSuite) SetUpTest(c *C) {
47         RefreshServiceDiscovery()
48 }
49
50 func pythonDir() string {
51         cwd, _ := os.Getwd()
52         return fmt.Sprintf("%s/../../python/tests", cwd)
53 }
54
55 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
56         arvadostest.StartAPI()
57         arvadostest.StartKeep(2, false)
58 }
59
60 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
61         arvadostest.StopKeep(2)
62         arvadostest.StopAPI()
63 }
64
65 func (s *ServerRequiredSuite) SetUpTest(c *C) {
66         RefreshServiceDiscovery()
67 }
68
69 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
70         arv, err := arvadosclient.MakeArvadosClient()
71         c.Assert(err, Equals, nil)
72
73         kc, err := MakeKeepClient(arv)
74
75         c.Assert(err, Equals, nil)
76         c.Check(len(kc.LocalRoots()), Equals, 2)
77         for _, root := range kc.LocalRoots() {
78                 c.Check(root, Matches, "http://localhost:\\d+")
79         }
80 }
81
82 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
83         arv, err := arvadosclient.MakeArvadosClient()
84         c.Assert(err, Equals, nil)
85
86         kc, err := MakeKeepClient(arv)
87         c.Check(err, IsNil)
88         c.Assert(kc.Want_replicas, Equals, 2)
89
90         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
91         kc, err = MakeKeepClient(arv)
92         c.Check(err, IsNil)
93         c.Assert(kc.Want_replicas, Equals, 3)
94
95         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
96         kc, err = MakeKeepClient(arv)
97         c.Check(err, IsNil)
98         c.Assert(kc.Want_replicas, Equals, 1)
99 }
100
101 type StubPutHandler struct {
102         c                    *C
103         expectPath           string
104         expectAPIToken       string
105         expectBody           string
106         expectStorageClass   string
107         returnStorageClasses string
108         handled              chan string
109         requests             []*http.Request
110         mtx                  sync.Mutex
111 }
112
113 func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
114         sph.mtx.Lock()
115         sph.requests = append(sph.requests, req)
116         sph.mtx.Unlock()
117         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
118         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken))
119         if sph.expectStorageClass != "*" {
120                 sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
121         }
122         body, err := ioutil.ReadAll(req.Body)
123         sph.c.Check(err, Equals, nil)
124         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
125         resp.Header().Set("X-Keep-Replicas-Stored", "1")
126         if sph.returnStorageClasses != "" {
127                 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
128         }
129         resp.WriteHeader(200)
130         sph.handled <- fmt.Sprintf("http://%s", req.Host)
131 }
132
133 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
134         var err error
135         // If we don't explicitly bind it to localhost, ks.listener.Addr() will
136         // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
137         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
138         if err != nil {
139                 panic(fmt.Sprintf("Could not listen on any port"))
140         }
141         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
142         go http.Serve(ks.listener, st)
143         return
144 }
145
146 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
147         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
148
149         ks := RunFakeKeepServer(st)
150         defer ks.listener.Close()
151
152         arv, _ := arvadosclient.MakeArvadosClient()
153         arv.ApiToken = "abc123"
154
155         kc, _ := MakeKeepClient(arv)
156
157         reader, writer := io.Pipe()
158         uploadStatusChan := make(chan uploadStatus)
159
160         f(kc, ks.url, reader, writer, uploadStatusChan)
161 }
162
163 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
164         log.Printf("TestUploadToStubKeepServer")
165
166         st := &StubPutHandler{
167                 c:                    c,
168                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
169                 expectAPIToken:       "abc123",
170                 expectBody:           "foo",
171                 expectStorageClass:   "",
172                 returnStorageClasses: "default=1",
173                 handled:              make(chan string),
174         }
175
176         UploadToStubHelper(c, st,
177                 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
178                         go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
179
180                         writer.Write([]byte("foo"))
181                         writer.Close()
182
183                         <-st.handled
184                         status := <-uploadStatusChan
185                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
186                 })
187 }
188
189 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
190         st := &StubPutHandler{
191                 c:                    c,
192                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
193                 expectAPIToken:       "abc123",
194                 expectBody:           "foo",
195                 expectStorageClass:   "",
196                 returnStorageClasses: "default=1",
197                 handled:              make(chan string),
198         }
199
200         UploadToStubHelper(c, st,
201                 func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
202                         go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
203
204                         <-st.handled
205
206                         status := <-uploadStatusChan
207                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
208                 })
209 }
210
211 func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
212         for _, trial := range []struct {
213                 respHeader string
214                 expectMap  map[string]int
215         }{
216                 {"", nil},
217                 {"foo=1", map[string]int{"foo": 1}},
218                 {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
219                 {" =foo=1 ", nil},
220                 {"foo", nil},
221         } {
222                 st := &StubPutHandler{
223                         c:                    c,
224                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
225                         expectAPIToken:       "abc123",
226                         expectBody:           "foo",
227                         expectStorageClass:   "",
228                         returnStorageClasses: trial.respHeader,
229                         handled:              make(chan string),
230                 }
231
232                 UploadToStubHelper(c, st,
233                         func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
234                                 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
235
236                                 writer.Write([]byte("foo"))
237                                 writer.Close()
238
239                                 <-st.handled
240                                 status := <-uploadStatusChan
241                                 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
242                         })
243         }
244 }
245
246 func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
247         nServers := 5
248         for _, trial := range []struct {
249                 replicas      int
250                 clientClasses []string
251                 putClasses    []string // putClasses takes precedence over clientClasses
252                 minRequests   int
253                 maxRequests   int
254                 success       bool
255         }{
256                 {1, []string{"class1"}, nil, 1, 1, true},
257                 {2, []string{"class1"}, nil, 1, 2, true},
258                 {3, []string{"class1"}, nil, 2, 3, true},
259                 {1, []string{"class1", "class2"}, nil, 1, 1, true},
260                 {3, nil, []string{"class1"}, 2, 3, true},
261                 {1, nil, []string{"class1", "class2"}, 1, 1, true},
262                 {1, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true},
263                 {1, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false},
264                 {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false},
265                 {1, []string{"class404"}, nil, nServers, nServers, false},
266                 {1, []string{"class1", "class404"}, nil, nServers, nServers, false},
267                 {1, nil, []string{"class1", "class404"}, nServers, nServers, false},
268         } {
269                 c.Logf("%+v", trial)
270                 st := &StubPutHandler{
271                         c:                    c,
272                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
273                         expectAPIToken:       "abc123",
274                         expectBody:           "foo",
275                         expectStorageClass:   "*",
276                         returnStorageClasses: "class1=2, class2=2",
277                         handled:              make(chan string, 100),
278                 }
279                 ks := RunSomeFakeKeepServers(st, nServers)
280                 arv, _ := arvadosclient.MakeArvadosClient()
281                 kc, _ := MakeKeepClient(arv)
282                 kc.Want_replicas = trial.replicas
283                 kc.StorageClasses = trial.clientClasses
284                 arv.ApiToken = "abc123"
285                 localRoots := make(map[string]string)
286                 writableLocalRoots := make(map[string]string)
287                 for i, k := range ks {
288                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
289                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
290                         defer k.listener.Close()
291                 }
292                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
293
294                 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
295                         Data:           []byte("foo"),
296                         StorageClasses: trial.putClasses,
297                 })
298                 if trial.success {
299                         c.Check(err, check.IsNil)
300                 } else {
301                         c.Check(err, check.NotNil)
302                 }
303                 c.Check(len(st.handled) >= trial.minRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
304                 c.Check(len(st.handled) <= trial.maxRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
305                 if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, check.Equals, true) {
306                         // Max concurrency should be 1. First request
307                         // should have succeeded for class1. Second
308                         // request should only ask for class404.
309                         c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), check.Equals, "class404")
310                 }
311         }
312 }
313
314 type FailHandler struct {
315         handled chan string
316 }
317
318 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
319         resp.WriteHeader(500)
320         fh.handled <- fmt.Sprintf("http://%s", req.Host)
321 }
322
323 type FailThenSucceedHandler struct {
324         handled        chan string
325         count          int
326         successhandler http.Handler
327         reqIDs         []string
328 }
329
330 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
331         fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
332         if fh.count == 0 {
333                 resp.WriteHeader(500)
334                 fh.count++
335                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
336         } else {
337                 fh.successhandler.ServeHTTP(resp, req)
338         }
339 }
340
341 type Error404Handler struct {
342         handled chan string
343 }
344
345 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
346         resp.WriteHeader(404)
347         fh.handled <- fmt.Sprintf("http://%s", req.Host)
348 }
349
350 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
351         st := FailHandler{
352                 make(chan string)}
353
354         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
355
356         UploadToStubHelper(c, st,
357                 func(kc *KeepClient, url string, reader io.ReadCloser,
358                         writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
359
360                         go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
361
362                         writer.Write([]byte("foo"))
363                         writer.Close()
364
365                         <-st.handled
366
367                         status := <-uploadStatusChan
368                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
369                         c.Check(status.statusCode, Equals, 500)
370                 })
371 }
372
373 type KeepServer struct {
374         listener net.Listener
375         url      string
376 }
377
378 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
379         ks = make([]KeepServer, n)
380
381         for i := 0; i < n; i++ {
382                 ks[i] = RunFakeKeepServer(st)
383         }
384
385         return ks
386 }
387
388 func (s *StandaloneSuite) TestPutB(c *C) {
389         hash := Md5String("foo")
390
391         st := &StubPutHandler{
392                 c:                    c,
393                 expectPath:           hash,
394                 expectAPIToken:       "abc123",
395                 expectBody:           "foo",
396                 expectStorageClass:   "",
397                 returnStorageClasses: "",
398                 handled:              make(chan string, 5),
399         }
400
401         arv, _ := arvadosclient.MakeArvadosClient()
402         kc, _ := MakeKeepClient(arv)
403
404         kc.Want_replicas = 2
405         arv.ApiToken = "abc123"
406         localRoots := make(map[string]string)
407         writableLocalRoots := make(map[string]string)
408
409         ks := RunSomeFakeKeepServers(st, 5)
410
411         for i, k := range ks {
412                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
413                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
414                 defer k.listener.Close()
415         }
416
417         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
418
419         kc.PutB([]byte("foo"))
420
421         shuff := NewRootSorter(
422                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
423
424         s1 := <-st.handled
425         s2 := <-st.handled
426         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
427                 (s1 == shuff[1] && s2 == shuff[0]),
428                 Equals,
429                 true)
430 }
431
432 func (s *StandaloneSuite) TestPutHR(c *C) {
433         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
434
435         st := &StubPutHandler{
436                 c:                    c,
437                 expectPath:           hash,
438                 expectAPIToken:       "abc123",
439                 expectBody:           "foo",
440                 expectStorageClass:   "",
441                 returnStorageClasses: "",
442                 handled:              make(chan string, 5),
443         }
444
445         arv, _ := arvadosclient.MakeArvadosClient()
446         kc, _ := MakeKeepClient(arv)
447
448         kc.Want_replicas = 2
449         arv.ApiToken = "abc123"
450         localRoots := make(map[string]string)
451         writableLocalRoots := make(map[string]string)
452
453         ks := RunSomeFakeKeepServers(st, 5)
454
455         for i, k := range ks {
456                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
457                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
458                 defer k.listener.Close()
459         }
460
461         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
462
463         reader, writer := io.Pipe()
464
465         go func() {
466                 writer.Write([]byte("foo"))
467                 writer.Close()
468         }()
469
470         kc.PutHR(hash, reader, 3)
471
472         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
473
474         s1 := <-st.handled
475         s2 := <-st.handled
476
477         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
478                 (s1 == shuff[1] && s2 == shuff[0]),
479                 Equals,
480                 true)
481 }
482
483 func (s *StandaloneSuite) TestPutWithFail(c *C) {
484         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
485
486         st := &StubPutHandler{
487                 c:                    c,
488                 expectPath:           hash,
489                 expectAPIToken:       "abc123",
490                 expectBody:           "foo",
491                 expectStorageClass:   "",
492                 returnStorageClasses: "",
493                 handled:              make(chan string, 4),
494         }
495
496         fh := FailHandler{
497                 make(chan string, 1)}
498
499         arv, err := arvadosclient.MakeArvadosClient()
500         c.Check(err, IsNil)
501         kc, _ := MakeKeepClient(arv)
502
503         kc.Want_replicas = 2
504         arv.ApiToken = "abc123"
505         localRoots := make(map[string]string)
506         writableLocalRoots := make(map[string]string)
507
508         ks1 := RunSomeFakeKeepServers(st, 4)
509         ks2 := RunSomeFakeKeepServers(fh, 1)
510
511         for i, k := range ks1 {
512                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
513                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
514                 defer k.listener.Close()
515         }
516         for i, k := range ks2 {
517                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
518                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
519                 defer k.listener.Close()
520         }
521
522         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
523
524         shuff := NewRootSorter(
525                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
526         c.Logf("%+v", shuff)
527
528         phash, replicas, err := kc.PutB([]byte("foo"))
529
530         <-fh.handled
531
532         c.Check(err, Equals, nil)
533         c.Check(phash, Equals, "")
534         c.Check(replicas, Equals, 2)
535
536         s1 := <-st.handled
537         s2 := <-st.handled
538
539         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
540                 (s1 == shuff[2] && s2 == shuff[1]),
541                 Equals,
542                 true)
543 }
544
545 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
546         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
547
548         st := &StubPutHandler{
549                 c:                    c,
550                 expectPath:           hash,
551                 expectAPIToken:       "abc123",
552                 expectBody:           "foo",
553                 expectStorageClass:   "",
554                 returnStorageClasses: "",
555                 handled:              make(chan string, 1),
556         }
557
558         fh := FailHandler{
559                 make(chan string, 4)}
560
561         arv, err := arvadosclient.MakeArvadosClient()
562         c.Check(err, IsNil)
563         kc, _ := MakeKeepClient(arv)
564
565         kc.Want_replicas = 2
566         kc.Retries = 0
567         arv.ApiToken = "abc123"
568         localRoots := make(map[string]string)
569         writableLocalRoots := make(map[string]string)
570
571         ks1 := RunSomeFakeKeepServers(st, 1)
572         ks2 := RunSomeFakeKeepServers(fh, 4)
573
574         for i, k := range ks1 {
575                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
576                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
577                 defer k.listener.Close()
578         }
579         for i, k := range ks2 {
580                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
581                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
582                 defer k.listener.Close()
583         }
584
585         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
586
587         _, replicas, err := kc.PutB([]byte("foo"))
588
589         c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
590         c.Check(replicas, Equals, 1)
591         c.Check(<-st.handled, Equals, ks1[0].url)
592 }
593
594 type StubGetHandler struct {
595         c              *C
596         expectPath     string
597         expectAPIToken string
598         httpStatus     int
599         body           []byte
600 }
601
602 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
603         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
604         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken))
605         resp.WriteHeader(sgh.httpStatus)
606         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
607         resp.Write(sgh.body)
608 }
609
610 func (s *StandaloneSuite) TestGet(c *C) {
611         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
612
613         st := StubGetHandler{
614                 c,
615                 hash,
616                 "abc123",
617                 http.StatusOK,
618                 []byte("foo")}
619
620         ks := RunFakeKeepServer(st)
621         defer ks.listener.Close()
622
623         arv, err := arvadosclient.MakeArvadosClient()
624         c.Check(err, IsNil)
625         kc, _ := MakeKeepClient(arv)
626         arv.ApiToken = "abc123"
627         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
628
629         r, n, url2, err := kc.Get(hash)
630         defer r.Close()
631         c.Check(err, Equals, nil)
632         c.Check(n, Equals, int64(3))
633         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
634
635         content, err2 := ioutil.ReadAll(r)
636         c.Check(err2, Equals, nil)
637         c.Check(content, DeepEquals, []byte("foo"))
638 }
639
640 func (s *StandaloneSuite) TestGet404(c *C) {
641         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
642
643         st := Error404Handler{make(chan string, 1)}
644
645         ks := RunFakeKeepServer(st)
646         defer ks.listener.Close()
647
648         arv, err := arvadosclient.MakeArvadosClient()
649         c.Check(err, IsNil)
650         kc, _ := MakeKeepClient(arv)
651         arv.ApiToken = "abc123"
652         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
653
654         r, n, url2, err := kc.Get(hash)
655         c.Check(err, Equals, BlockNotFound)
656         c.Check(n, Equals, int64(0))
657         c.Check(url2, Equals, "")
658         c.Check(r, Equals, nil)
659 }
660
661 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
662         st := Error404Handler{make(chan string, 1)}
663
664         ks := RunFakeKeepServer(st)
665         defer ks.listener.Close()
666
667         arv, err := arvadosclient.MakeArvadosClient()
668         c.Check(err, IsNil)
669         kc, _ := MakeKeepClient(arv)
670         arv.ApiToken = "abc123"
671         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
672
673         r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
674         c.Check(err, IsNil)
675         c.Check(n, Equals, int64(0))
676         c.Check(url2, Equals, "")
677         c.Assert(r, NotNil)
678         buf, err := ioutil.ReadAll(r)
679         c.Check(err, IsNil)
680         c.Check(buf, DeepEquals, []byte{})
681 }
682
683 func (s *StandaloneSuite) TestGetFail(c *C) {
684         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
685
686         st := FailHandler{make(chan string, 1)}
687
688         ks := RunFakeKeepServer(st)
689         defer ks.listener.Close()
690
691         arv, err := arvadosclient.MakeArvadosClient()
692         c.Check(err, IsNil)
693         kc, _ := MakeKeepClient(arv)
694         arv.ApiToken = "abc123"
695         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
696         kc.Retries = 0
697
698         r, n, url2, err := kc.Get(hash)
699         errNotFound, _ := err.(*ErrNotFound)
700         c.Check(errNotFound, NotNil)
701         c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
702         c.Check(errNotFound.Temporary(), Equals, true)
703         c.Check(n, Equals, int64(0))
704         c.Check(url2, Equals, "")
705         c.Check(r, Equals, nil)
706 }
707
708 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
709         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
710
711         st := &FailThenSucceedHandler{
712                 handled: make(chan string, 1),
713                 successhandler: StubGetHandler{
714                         c,
715                         hash,
716                         "abc123",
717                         http.StatusOK,
718                         []byte("foo")}}
719
720         ks := RunFakeKeepServer(st)
721         defer ks.listener.Close()
722
723         arv, err := arvadosclient.MakeArvadosClient()
724         c.Check(err, IsNil)
725         kc, _ := MakeKeepClient(arv)
726         arv.ApiToken = "abc123"
727         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
728
729         r, n, url2, err := kc.Get(hash)
730         defer r.Close()
731         c.Check(err, Equals, nil)
732         c.Check(n, Equals, int64(3))
733         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
734
735         content, err2 := ioutil.ReadAll(r)
736         c.Check(err2, Equals, nil)
737         c.Check(content, DeepEquals, []byte("foo"))
738
739         c.Logf("%q", st.reqIDs)
740         c.Assert(len(st.reqIDs) > 1, Equals, true)
741         for _, reqid := range st.reqIDs {
742                 c.Check(reqid, Not(Equals), "")
743                 c.Check(reqid, Equals, st.reqIDs[0])
744         }
745 }
746
747 func (s *StandaloneSuite) TestGetNetError(c *C) {
748         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
749
750         arv, err := arvadosclient.MakeArvadosClient()
751         c.Check(err, IsNil)
752         kc, _ := MakeKeepClient(arv)
753         arv.ApiToken = "abc123"
754         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
755
756         r, n, url2, err := kc.Get(hash)
757         errNotFound, _ := err.(*ErrNotFound)
758         c.Check(errNotFound, NotNil)
759         c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
760         c.Check(errNotFound.Temporary(), Equals, true)
761         c.Check(n, Equals, int64(0))
762         c.Check(url2, Equals, "")
763         c.Check(r, Equals, nil)
764 }
765
766 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
767         uuid := "zzzzz-bi6l4-123451234512345"
768         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
769
770         // This one shouldn't be used:
771         ks0 := RunFakeKeepServer(StubGetHandler{
772                 c,
773                 "error if used",
774                 "abc123",
775                 http.StatusOK,
776                 []byte("foo")})
777         defer ks0.listener.Close()
778         // This one should be used:
779         ks := RunFakeKeepServer(StubGetHandler{
780                 c,
781                 hash + "+K@" + uuid,
782                 "abc123",
783                 http.StatusOK,
784                 []byte("foo")})
785         defer ks.listener.Close()
786
787         arv, err := arvadosclient.MakeArvadosClient()
788         c.Check(err, IsNil)
789         kc, _ := MakeKeepClient(arv)
790         arv.ApiToken = "abc123"
791         kc.SetServiceRoots(
792                 map[string]string{"x": ks0.url},
793                 nil,
794                 map[string]string{uuid: ks.url})
795
796         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
797         defer r.Close()
798         c.Check(err, Equals, nil)
799         c.Check(n, Equals, int64(3))
800         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
801
802         content, err := ioutil.ReadAll(r)
803         c.Check(err, Equals, nil)
804         c.Check(content, DeepEquals, []byte("foo"))
805 }
806
807 // Use a service hint to fetch from a local disk service, overriding
808 // rendezvous probe order.
809 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
810         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
811         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
812
813         // This one shouldn't be used, although it appears first in
814         // rendezvous probe order:
815         ks0 := RunFakeKeepServer(StubGetHandler{
816                 c,
817                 "error if used",
818                 "abc123",
819                 http.StatusOK,
820                 []byte("foo")})
821         defer ks0.listener.Close()
822         // This one should be used:
823         ks := RunFakeKeepServer(StubGetHandler{
824                 c,
825                 hash + "+K@" + uuid,
826                 "abc123",
827                 http.StatusOK,
828                 []byte("foo")})
829         defer ks.listener.Close()
830
831         arv, err := arvadosclient.MakeArvadosClient()
832         c.Check(err, IsNil)
833         kc, _ := MakeKeepClient(arv)
834         arv.ApiToken = "abc123"
835         kc.SetServiceRoots(
836                 map[string]string{
837                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
838                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
839                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
840                         uuid:                          ks.url},
841                 nil,
842                 map[string]string{
843                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
844                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
845                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
846                         uuid:                          ks.url},
847         )
848
849         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
850         defer r.Close()
851         c.Check(err, Equals, nil)
852         c.Check(n, Equals, int64(3))
853         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
854
855         content, err := ioutil.ReadAll(r)
856         c.Check(err, Equals, nil)
857         c.Check(content, DeepEquals, []byte("foo"))
858 }
859
860 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
861         uuid := "zzzzz-bi6l4-123451234512345"
862         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
863
864         ksLocal := RunFakeKeepServer(StubGetHandler{
865                 c,
866                 hash + "+K@" + uuid,
867                 "abc123",
868                 http.StatusOK,
869                 []byte("foo")})
870         defer ksLocal.listener.Close()
871         ksGateway := RunFakeKeepServer(StubGetHandler{
872                 c,
873                 hash + "+K@" + uuid,
874                 "abc123",
875                 http.StatusInternalServerError,
876                 []byte("Error")})
877         defer ksGateway.listener.Close()
878
879         arv, err := arvadosclient.MakeArvadosClient()
880         c.Check(err, IsNil)
881         kc, _ := MakeKeepClient(arv)
882         arv.ApiToken = "abc123"
883         kc.SetServiceRoots(
884                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
885                 nil,
886                 map[string]string{uuid: ksGateway.url})
887
888         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
889         c.Assert(err, Equals, nil)
890         defer r.Close()
891         c.Check(n, Equals, int64(3))
892         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
893
894         content, err := ioutil.ReadAll(r)
895         c.Check(err, Equals, nil)
896         c.Check(content, DeepEquals, []byte("foo"))
897 }
898
899 type BarHandler struct {
900         handled chan string
901 }
902
903 func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
904         resp.Write([]byte("bar"))
905         h.handled <- fmt.Sprintf("http://%s", req.Host)
906 }
907
908 func (s *StandaloneSuite) TestChecksum(c *C) {
909         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
910         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
911
912         st := BarHandler{make(chan string, 1)}
913
914         ks := RunFakeKeepServer(st)
915         defer ks.listener.Close()
916
917         arv, err := arvadosclient.MakeArvadosClient()
918         c.Check(err, IsNil)
919         kc, _ := MakeKeepClient(arv)
920         arv.ApiToken = "abc123"
921         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
922
923         r, n, _, err := kc.Get(barhash)
924         c.Check(err, IsNil)
925         _, err = ioutil.ReadAll(r)
926         c.Check(n, Equals, int64(3))
927         c.Check(err, Equals, nil)
928
929         <-st.handled
930
931         r, n, _, err = kc.Get(foohash)
932         c.Check(err, IsNil)
933         _, err = ioutil.ReadAll(r)
934         c.Check(n, Equals, int64(3))
935         c.Check(err, Equals, BadChecksum)
936
937         <-st.handled
938 }
939
940 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
941         content := []byte("waz")
942         hash := fmt.Sprintf("%x", md5.Sum(content))
943
944         fh := Error404Handler{
945                 make(chan string, 4)}
946
947         st := StubGetHandler{
948                 c,
949                 hash,
950                 "abc123",
951                 http.StatusOK,
952                 content}
953
954         arv, err := arvadosclient.MakeArvadosClient()
955         c.Check(err, IsNil)
956         kc, _ := MakeKeepClient(arv)
957         arv.ApiToken = "abc123"
958         localRoots := make(map[string]string)
959         writableLocalRoots := make(map[string]string)
960
961         ks1 := RunSomeFakeKeepServers(st, 1)
962         ks2 := RunSomeFakeKeepServers(fh, 4)
963
964         for i, k := range ks1 {
965                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
966                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
967                 defer k.listener.Close()
968         }
969         for i, k := range ks2 {
970                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
971                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
972                 defer k.listener.Close()
973         }
974
975         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
976         kc.Retries = 0
977
978         // This test works only if one of the failing services is
979         // attempted before the succeeding service. Otherwise,
980         // <-fh.handled below will just hang! (Probe order depends on
981         // the choice of block content "waz" and the UUIDs of the fake
982         // servers, so we just tried different strings until we found
983         // an example that passes this Assert.)
984         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
985
986         r, n, url2, err := kc.Get(hash)
987
988         <-fh.handled
989         c.Check(err, Equals, nil)
990         c.Check(n, Equals, int64(3))
991         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
992
993         readContent, err2 := ioutil.ReadAll(r)
994         c.Check(err2, Equals, nil)
995         c.Check(readContent, DeepEquals, content)
996 }
997
998 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
999         content := []byte("TestPutGetHead")
1000
1001         arv, err := arvadosclient.MakeArvadosClient()
1002         c.Check(err, IsNil)
1003         kc, err := MakeKeepClient(arv)
1004         c.Assert(err, Equals, nil)
1005
1006         hash := fmt.Sprintf("%x", md5.Sum(content))
1007
1008         {
1009                 n, _, err := kc.Ask(hash)
1010                 c.Check(err, Equals, BlockNotFound)
1011                 c.Check(n, Equals, int64(0))
1012         }
1013         {
1014                 hash2, replicas, err := kc.PutB(content)
1015                 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
1016                 c.Check(replicas, Equals, 2)
1017                 c.Check(err, Equals, nil)
1018         }
1019         {
1020                 r, n, url2, err := kc.Get(hash)
1021                 c.Check(err, Equals, nil)
1022                 c.Check(n, Equals, int64(len(content)))
1023                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
1024
1025                 readContent, err2 := ioutil.ReadAll(r)
1026                 c.Check(err2, Equals, nil)
1027                 c.Check(readContent, DeepEquals, content)
1028         }
1029         {
1030                 n, url2, err := kc.Ask(hash)
1031                 c.Check(err, Equals, nil)
1032                 c.Check(n, Equals, int64(len(content)))
1033                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
1034         }
1035         {
1036                 loc, err := kc.LocalLocator(hash)
1037                 c.Check(err, Equals, nil)
1038                 c.Assert(len(loc) >= 32, Equals, true)
1039                 c.Check(loc[:32], Equals, hash[:32])
1040         }
1041         {
1042                 content := []byte("the perth county conspiracy")
1043                 loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
1044                 c.Check(loc, Equals, "")
1045                 c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
1046                 c.Check(err, ErrorMatches, `.*HTTP 400.*`)
1047         }
1048 }
1049
1050 type StubProxyHandler struct {
1051         handled chan string
1052 }
1053
1054 func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1055         resp.Header().Set("X-Keep-Replicas-Stored", "2")
1056         h.handled <- fmt.Sprintf("http://%s", req.Host)
1057 }
1058
1059 func (s *StandaloneSuite) TestPutProxy(c *C) {
1060         st := StubProxyHandler{make(chan string, 1)}
1061
1062         arv, err := arvadosclient.MakeArvadosClient()
1063         c.Check(err, IsNil)
1064         kc, _ := MakeKeepClient(arv)
1065
1066         kc.Want_replicas = 2
1067         arv.ApiToken = "abc123"
1068         localRoots := make(map[string]string)
1069         writableLocalRoots := make(map[string]string)
1070
1071         ks1 := RunSomeFakeKeepServers(st, 1)
1072
1073         for i, k := range ks1 {
1074                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1075                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1076                 defer k.listener.Close()
1077         }
1078
1079         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1080
1081         _, replicas, err := kc.PutB([]byte("foo"))
1082         <-st.handled
1083
1084         c.Check(err, Equals, nil)
1085         c.Check(replicas, Equals, 2)
1086 }
1087
1088 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
1089         st := StubProxyHandler{make(chan string, 1)}
1090
1091         arv, err := arvadosclient.MakeArvadosClient()
1092         c.Check(err, IsNil)
1093         kc, _ := MakeKeepClient(arv)
1094
1095         kc.Want_replicas = 3
1096         arv.ApiToken = "abc123"
1097         localRoots := make(map[string]string)
1098         writableLocalRoots := make(map[string]string)
1099
1100         ks1 := RunSomeFakeKeepServers(st, 1)
1101
1102         for i, k := range ks1 {
1103                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1104                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1105                 defer k.listener.Close()
1106         }
1107         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1108
1109         _, replicas, err := kc.PutB([]byte("foo"))
1110         <-st.handled
1111
1112         c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1113         c.Check(replicas, Equals, 2)
1114 }
1115
1116 func (s *StandaloneSuite) TestMakeLocator(c *C) {
1117         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
1118         c.Check(err, Equals, nil)
1119         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1120         c.Check(l.Size, Equals, 3)
1121         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
1122 }
1123
1124 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
1125         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
1126         c.Check(err, Equals, nil)
1127         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1128         c.Check(l.Size, Equals, -1)
1129         c.Check(l.Hints, DeepEquals, []string{})
1130 }
1131
1132 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
1133         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
1134         c.Check(err, Equals, nil)
1135         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1136         c.Check(l.Size, Equals, -1)
1137         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
1138 }
1139
1140 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
1141         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
1142         l, err := MakeLocator(str)
1143         c.Check(err, Equals, nil)
1144         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1145         c.Check(l.Size, Equals, 3)
1146         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
1147         c.Check(l.String(), Equals, str)
1148 }
1149
1150 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
1151         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
1152         c.Check(err, Equals, InvalidLocatorError)
1153 }
1154
1155 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
1156         hash := Md5String("foo")
1157
1158         st := &StubPutHandler{
1159                 c:                    c,
1160                 expectPath:           hash,
1161                 expectAPIToken:       "abc123",
1162                 expectBody:           "foo",
1163                 expectStorageClass:   "",
1164                 returnStorageClasses: "",
1165                 handled:              make(chan string, 5),
1166         }
1167
1168         arv, _ := arvadosclient.MakeArvadosClient()
1169         kc, _ := MakeKeepClient(arv)
1170
1171         kc.Want_replicas = 2
1172         arv.ApiToken = "abc123"
1173         localRoots := make(map[string]string)
1174         writableLocalRoots := make(map[string]string)
1175
1176         ks := RunSomeFakeKeepServers(st, 5)
1177
1178         for i, k := range ks {
1179                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1180                 if i == 0 {
1181                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1182                 }
1183                 defer k.listener.Close()
1184         }
1185
1186         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1187
1188         _, replicas, err := kc.PutB([]byte("foo"))
1189
1190         c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1191         c.Check(replicas, Equals, 1)
1192
1193         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1194 }
1195
1196 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1197         hash := Md5String("foo")
1198
1199         st := &StubPutHandler{
1200                 c:                    c,
1201                 expectPath:           hash,
1202                 expectAPIToken:       "abc123",
1203                 expectBody:           "foo",
1204                 expectStorageClass:   "",
1205                 returnStorageClasses: "",
1206                 handled:              make(chan string, 5),
1207         }
1208
1209         arv, _ := arvadosclient.MakeArvadosClient()
1210         kc, _ := MakeKeepClient(arv)
1211
1212         kc.Want_replicas = 2
1213         arv.ApiToken = "abc123"
1214         localRoots := make(map[string]string)
1215         writableLocalRoots := make(map[string]string)
1216
1217         ks := RunSomeFakeKeepServers(st, 5)
1218
1219         for i, k := range ks {
1220                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1221                 defer k.listener.Close()
1222         }
1223
1224         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1225
1226         _, replicas, err := kc.PutB([]byte("foo"))
1227
1228         c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1229         c.Check(replicas, Equals, 0)
1230 }
1231
1232 type StubGetIndexHandler struct {
1233         c              *C
1234         expectPath     string
1235         expectAPIToken string
1236         httpStatus     int
1237         body           []byte
1238 }
1239
1240 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1241         h.c.Check(req.URL.Path, Equals, h.expectPath)
1242         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1243         resp.WriteHeader(h.httpStatus)
1244         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1245         resp.Write(h.body)
1246 }
1247
1248 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1249         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1250
1251         st := StubGetIndexHandler{
1252                 c,
1253                 "/index",
1254                 "abc123",
1255                 http.StatusOK,
1256                 []byte(hash + "+3 1443559274\n\n")}
1257
1258         ks := RunFakeKeepServer(st)
1259         defer ks.listener.Close()
1260
1261         arv, err := arvadosclient.MakeArvadosClient()
1262         c.Assert(err, IsNil)
1263         kc, err := MakeKeepClient(arv)
1264         c.Assert(err, IsNil)
1265         arv.ApiToken = "abc123"
1266         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1267
1268         r, err := kc.GetIndex("x", "")
1269         c.Check(err, IsNil)
1270
1271         content, err2 := ioutil.ReadAll(r)
1272         c.Check(err2, Equals, nil)
1273         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1274 }
1275
1276 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1277         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1278
1279         st := StubGetIndexHandler{
1280                 c,
1281                 "/index/" + hash[0:3],
1282                 "abc123",
1283                 http.StatusOK,
1284                 []byte(hash + "+3 1443559274\n\n")}
1285
1286         ks := RunFakeKeepServer(st)
1287         defer ks.listener.Close()
1288
1289         arv, err := arvadosclient.MakeArvadosClient()
1290         c.Check(err, IsNil)
1291         kc, _ := MakeKeepClient(arv)
1292         arv.ApiToken = "abc123"
1293         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1294
1295         r, err := kc.GetIndex("x", hash[0:3])
1296         c.Assert(err, Equals, nil)
1297
1298         content, err2 := ioutil.ReadAll(r)
1299         c.Check(err2, Equals, nil)
1300         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1301 }
1302
1303 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1304         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1305
1306         st := StubGetIndexHandler{
1307                 c,
1308                 "/index/" + hash[0:3],
1309                 "abc123",
1310                 http.StatusOK,
1311                 []byte(hash)}
1312
1313         ks := RunFakeKeepServer(st)
1314         defer ks.listener.Close()
1315
1316         arv, err := arvadosclient.MakeArvadosClient()
1317         c.Check(err, IsNil)
1318         kc, _ := MakeKeepClient(arv)
1319         arv.ApiToken = "abc123"
1320         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1321
1322         _, err = kc.GetIndex("x", hash[0:3])
1323         c.Check(err, Equals, ErrIncompleteIndex)
1324 }
1325
1326 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1327         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1328
1329         st := StubGetIndexHandler{
1330                 c,
1331                 "/index/" + hash[0:3],
1332                 "abc123",
1333                 http.StatusOK,
1334                 []byte(hash)}
1335
1336         ks := RunFakeKeepServer(st)
1337         defer ks.listener.Close()
1338
1339         arv, err := arvadosclient.MakeArvadosClient()
1340         c.Check(err, IsNil)
1341         kc, _ := MakeKeepClient(arv)
1342         arv.ApiToken = "abc123"
1343         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1344
1345         _, err = kc.GetIndex("y", hash[0:3])
1346         c.Check(err, Equals, ErrNoSuchKeepServer)
1347 }
1348
1349 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1350         st := StubGetIndexHandler{
1351                 c,
1352                 "/index/abcd",
1353                 "abc123",
1354                 http.StatusOK,
1355                 []byte("\n")}
1356
1357         ks := RunFakeKeepServer(st)
1358         defer ks.listener.Close()
1359
1360         arv, err := arvadosclient.MakeArvadosClient()
1361         c.Check(err, IsNil)
1362         kc, _ := MakeKeepClient(arv)
1363         arv.ApiToken = "abc123"
1364         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1365
1366         r, err := kc.GetIndex("x", "abcd")
1367         c.Check(err, Equals, nil)
1368
1369         content, err2 := ioutil.ReadAll(r)
1370         c.Check(err2, Equals, nil)
1371         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1372 }
1373
1374 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1375         st := &FailThenSucceedHandler{
1376                 handled: make(chan string, 1),
1377                 successhandler: &StubPutHandler{
1378                         c:                    c,
1379                         expectPath:           Md5String("foo"),
1380                         expectAPIToken:       "abc123",
1381                         expectBody:           "foo",
1382                         expectStorageClass:   "",
1383                         returnStorageClasses: "",
1384                         handled:              make(chan string, 5),
1385                 },
1386         }
1387
1388         arv, _ := arvadosclient.MakeArvadosClient()
1389         kc, _ := MakeKeepClient(arv)
1390
1391         kc.Want_replicas = 2
1392         arv.ApiToken = "abc123"
1393         localRoots := make(map[string]string)
1394         writableLocalRoots := make(map[string]string)
1395
1396         ks := RunSomeFakeKeepServers(st, 2)
1397
1398         for i, k := range ks {
1399                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1400                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1401                 defer k.listener.Close()
1402         }
1403
1404         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1405
1406         hash, replicas, err := kc.PutB([]byte("foo"))
1407
1408         c.Check(err, Equals, nil)
1409         c.Check(hash, Equals, "")
1410         c.Check(replicas, Equals, 2)
1411 }
1412
1413 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1414         arv, err := arvadosclient.MakeArvadosClient()
1415         c.Assert(err, Equals, nil)
1416
1417         // Add an additional "testblobstore" keepservice
1418         blobKeepService := make(arvadosclient.Dict)
1419         err = arv.Create("keep_services",
1420                 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1421                         "service_host": "localhost",
1422                         "service_port": "21321",
1423                         "service_type": "testblobstore"}},
1424                 &blobKeepService)
1425         c.Assert(err, Equals, nil)
1426         defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1427         RefreshServiceDiscovery()
1428
1429         // Make a keepclient and ensure that the testblobstore is included
1430         kc, err := MakeKeepClient(arv)
1431         c.Assert(err, Equals, nil)
1432
1433         // verify kc.LocalRoots
1434         c.Check(len(kc.LocalRoots()), Equals, 3)
1435         for _, root := range kc.LocalRoots() {
1436                 c.Check(root, Matches, "http://localhost:\\d+")
1437         }
1438         c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1439
1440         // verify kc.GatewayRoots
1441         c.Check(len(kc.GatewayRoots()), Equals, 3)
1442         for _, root := range kc.GatewayRoots() {
1443                 c.Check(root, Matches, "http://localhost:\\d+")
1444         }
1445         c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1446
1447         // verify kc.WritableLocalRoots
1448         c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1449         for _, root := range kc.WritableLocalRoots() {
1450                 c.Check(root, Matches, "http://localhost:\\d+")
1451         }
1452         c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1453
1454         c.Assert(kc.replicasPerService, Equals, 0)
1455         c.Assert(kc.foundNonDiskSvc, Equals, true)
1456         c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
1457 }