Merge branch '18349-fed-request-id'
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "net"
16         "net/http"
17         "os"
18         "strings"
19         "sync"
20         "testing"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25         "git.arvados.org/arvados.git/sdk/go/arvadostest"
26         . "gopkg.in/check.v1"
27 )
28
29 // Gocheck boilerplate
30 func Test(t *testing.T) {
31         TestingT(t)
32 }
33
34 // Gocheck boilerplate
35 var _ = Suite(&ServerRequiredSuite{})
36 var _ = Suite(&StandaloneSuite{})
37
38 // Tests that require the Keep server running
39 type ServerRequiredSuite struct{}
40
41 // Standalone tests
42 type StandaloneSuite struct{}
43
44 func (s *StandaloneSuite) SetUpTest(c *C) {
45         RefreshServiceDiscovery()
46 }
47
48 func pythonDir() string {
49         cwd, _ := os.Getwd()
50         return fmt.Sprintf("%s/../../python/tests", cwd)
51 }
52
53 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
54         arvadostest.StartKeep(2, false)
55 }
56
57 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
58         arvadostest.StopKeep(2)
59 }
60
61 func (s *ServerRequiredSuite) SetUpTest(c *C) {
62         RefreshServiceDiscovery()
63 }
64
65 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
66         arv, err := arvadosclient.MakeArvadosClient()
67         c.Assert(err, Equals, nil)
68
69         kc, err := MakeKeepClient(arv)
70
71         c.Assert(err, Equals, nil)
72         c.Check(len(kc.LocalRoots()), Equals, 2)
73         for _, root := range kc.LocalRoots() {
74                 c.Check(root, Matches, "http://localhost:\\d+")
75         }
76 }
77
78 func (s *ServerRequiredSuite) TestDefaultStorageClasses(c *C) {
79         arv, err := arvadosclient.MakeArvadosClient()
80         c.Assert(err, IsNil)
81
82         cc, err := arv.ClusterConfig("StorageClasses")
83         c.Assert(err, IsNil)
84         c.Assert(cc, NotNil)
85         c.Assert(cc.(map[string]interface{})["default"], NotNil)
86
87         kc := New(arv)
88         c.Assert(kc.DefaultStorageClasses, DeepEquals, []string{"default"})
89 }
90
91 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
92         arv, err := arvadosclient.MakeArvadosClient()
93         c.Assert(err, IsNil)
94
95         kc, err := MakeKeepClient(arv)
96         c.Check(err, IsNil)
97         c.Assert(kc.Want_replicas, Equals, 2)
98
99         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
100         kc, err = MakeKeepClient(arv)
101         c.Check(err, IsNil)
102         c.Assert(kc.Want_replicas, Equals, 3)
103
104         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
105         kc, err = MakeKeepClient(arv)
106         c.Check(err, IsNil)
107         c.Assert(kc.Want_replicas, Equals, 1)
108 }
109
110 type StubPutHandler struct {
111         c                    *C
112         expectPath           string
113         expectAPIToken       string
114         expectBody           string
115         expectStorageClass   string
116         returnStorageClasses string
117         handled              chan string
118         requests             []*http.Request
119         mtx                  sync.Mutex
120 }
121
122 func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
123         sph.mtx.Lock()
124         sph.requests = append(sph.requests, req)
125         sph.mtx.Unlock()
126         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
127         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken))
128         if sph.expectStorageClass != "*" {
129                 sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
130         }
131         body, err := ioutil.ReadAll(req.Body)
132         sph.c.Check(err, Equals, nil)
133         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
134         resp.Header().Set("X-Keep-Replicas-Stored", "1")
135         if sph.returnStorageClasses != "" {
136                 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
137         }
138         resp.WriteHeader(200)
139         sph.handled <- fmt.Sprintf("http://%s", req.Host)
140 }
141
142 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
143         var err error
144         // If we don't explicitly bind it to localhost, ks.listener.Addr() will
145         // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
146         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
147         if err != nil {
148                 panic("Could not listen on any port")
149         }
150         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
151         go http.Serve(ks.listener, st)
152         return
153 }
154
155 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
156         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
157
158         ks := RunFakeKeepServer(st)
159         defer ks.listener.Close()
160
161         arv, _ := arvadosclient.MakeArvadosClient()
162         arv.ApiToken = "abc123"
163
164         kc, _ := MakeKeepClient(arv)
165
166         reader, writer := io.Pipe()
167         uploadStatusChan := make(chan uploadStatus)
168
169         f(kc, ks.url, reader, writer, uploadStatusChan)
170 }
171
172 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
173         log.Printf("TestUploadToStubKeepServer")
174
175         st := &StubPutHandler{
176                 c:                    c,
177                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
178                 expectAPIToken:       "abc123",
179                 expectBody:           "foo",
180                 expectStorageClass:   "",
181                 returnStorageClasses: "default=1",
182                 handled:              make(chan string),
183         }
184
185         UploadToStubHelper(c, st,
186                 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
187                         go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
188
189                         writer.Write([]byte("foo"))
190                         writer.Close()
191
192                         <-st.handled
193                         status := <-uploadStatusChan
194                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
195                 })
196 }
197
198 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
199         st := &StubPutHandler{
200                 c:                    c,
201                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
202                 expectAPIToken:       "abc123",
203                 expectBody:           "foo",
204                 expectStorageClass:   "",
205                 returnStorageClasses: "default=1",
206                 handled:              make(chan string),
207         }
208
209         UploadToStubHelper(c, st,
210                 func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
211                         go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
212
213                         <-st.handled
214
215                         status := <-uploadStatusChan
216                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
217                 })
218 }
219
220 func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
221         for _, trial := range []struct {
222                 respHeader string
223                 expectMap  map[string]int
224         }{
225                 {"", nil},
226                 {"foo=1", map[string]int{"foo": 1}},
227                 {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
228                 {" =foo=1 ", nil},
229                 {"foo", nil},
230         } {
231                 st := &StubPutHandler{
232                         c:                    c,
233                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
234                         expectAPIToken:       "abc123",
235                         expectBody:           "foo",
236                         expectStorageClass:   "",
237                         returnStorageClasses: trial.respHeader,
238                         handled:              make(chan string),
239                 }
240
241                 UploadToStubHelper(c, st,
242                         func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
243                                 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
244
245                                 writer.Write([]byte("foo"))
246                                 writer.Close()
247
248                                 <-st.handled
249                                 status := <-uploadStatusChan
250                                 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
251                         })
252         }
253 }
254
255 func (s *StandaloneSuite) TestPutWithoutStorageClassesClusterSupport(c *C) {
256         nServers := 5
257         for _, trial := range []struct {
258                 replicas      int
259                 clientClasses []string
260                 putClasses    []string
261                 minRequests   int
262                 maxRequests   int
263                 success       bool
264         }{
265                 // Talking to an older cluster (no default storage classes exported
266                 // config) and no other additional storage classes requirements.
267                 {1, nil, nil, 1, 1, true},
268                 {2, nil, nil, 2, 2, true},
269                 {3, nil, nil, 3, 3, true},
270                 {nServers*2 + 1, nil, nil, nServers, nServers, false},
271
272                 {1, []string{"class1"}, nil, 1, 1, true},
273                 {2, []string{"class1"}, nil, 2, 2, true},
274                 {3, []string{"class1"}, nil, 3, 3, true},
275                 {1, []string{"class1", "class2"}, nil, 1, 1, true},
276                 {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false},
277
278                 {1, nil, []string{"class1"}, 1, 1, true},
279                 {2, nil, []string{"class1"}, 2, 2, true},
280                 {3, nil, []string{"class1"}, 3, 3, true},
281                 {1, nil, []string{"class1", "class2"}, 1, 1, true},
282                 {nServers*2 + 1, nil, []string{"class1"}, nServers, nServers, false},
283         } {
284                 c.Logf("%+v", trial)
285                 st := &StubPutHandler{
286                         c:                    c,
287                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
288                         expectAPIToken:       "abc123",
289                         expectBody:           "foo",
290                         expectStorageClass:   "*",
291                         returnStorageClasses: "", // Simulate old cluster without SC keep support
292                         handled:              make(chan string, 100),
293                 }
294                 ks := RunSomeFakeKeepServers(st, nServers)
295                 arv, _ := arvadosclient.MakeArvadosClient()
296                 kc, _ := MakeKeepClient(arv)
297                 kc.Want_replicas = trial.replicas
298                 kc.StorageClasses = trial.clientClasses
299                 kc.DefaultStorageClasses = nil // Simulate an old cluster without SC defaults
300                 arv.ApiToken = "abc123"
301                 localRoots := make(map[string]string)
302                 writableLocalRoots := make(map[string]string)
303                 for i, k := range ks {
304                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
305                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
306                         defer k.listener.Close()
307                 }
308                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
309
310                 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
311                         Data:           []byte("foo"),
312                         StorageClasses: trial.putClasses,
313                 })
314                 if trial.success {
315                         c.Check(err, IsNil)
316                 } else {
317                         c.Check(err, NotNil)
318                 }
319                 c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
320                 c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
321                 if trial.clientClasses == nil && trial.putClasses == nil {
322                         c.Check(st.requests[0].Header.Get("X-Keep-Storage-Classes"), Equals, "")
323                 }
324         }
325 }
326
327 func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
328         nServers := 5
329         for _, trial := range []struct {
330                 replicas       int
331                 defaultClasses []string
332                 clientClasses  []string // clientClasses takes precedence over defaultClasses
333                 putClasses     []string // putClasses takes precedence over clientClasses
334                 minRequests    int
335                 maxRequests    int
336                 success        bool
337         }{
338                 {1, []string{"class1"}, nil, nil, 1, 1, true},
339                 {2, []string{"class1"}, nil, nil, 1, 2, true},
340                 {3, []string{"class1"}, nil, nil, 2, 3, true},
341                 {1, []string{"class1", "class2"}, nil, nil, 1, 1, true},
342
343                 // defaultClasses doesn't matter when any of the others is specified.
344                 {1, []string{"class1"}, []string{"class1"}, nil, 1, 1, true},
345                 {2, []string{"class1"}, []string{"class1"}, nil, 1, 2, true},
346                 {3, []string{"class1"}, []string{"class1"}, nil, 2, 3, true},
347                 {1, []string{"class1"}, []string{"class1", "class2"}, nil, 1, 1, true},
348                 {3, []string{"class1"}, nil, []string{"class1"}, 2, 3, true},
349                 {1, []string{"class1"}, nil, []string{"class1", "class2"}, 1, 1, true},
350                 {1, []string{"class1"}, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true},
351                 {1, []string{"class1"}, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false},
352                 {nServers*2 + 1, []string{}, []string{"class1"}, nil, nServers, nServers, false},
353                 {1, []string{"class1"}, []string{"class404"}, nil, nServers, nServers, false},
354                 {1, []string{"class1"}, []string{"class1", "class404"}, nil, nServers, nServers, false},
355                 {1, []string{"class1"}, nil, []string{"class1", "class404"}, nServers, nServers, false},
356         } {
357                 c.Logf("%+v", trial)
358                 st := &StubPutHandler{
359                         c:                    c,
360                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
361                         expectAPIToken:       "abc123",
362                         expectBody:           "foo",
363                         expectStorageClass:   "*",
364                         returnStorageClasses: "class1=2, class2=2",
365                         handled:              make(chan string, 100),
366                 }
367                 ks := RunSomeFakeKeepServers(st, nServers)
368                 arv, _ := arvadosclient.MakeArvadosClient()
369                 kc, _ := MakeKeepClient(arv)
370                 kc.Want_replicas = trial.replicas
371                 kc.StorageClasses = trial.clientClasses
372                 kc.DefaultStorageClasses = trial.defaultClasses
373                 arv.ApiToken = "abc123"
374                 localRoots := make(map[string]string)
375                 writableLocalRoots := make(map[string]string)
376                 for i, k := range ks {
377                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
378                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
379                         defer k.listener.Close()
380                 }
381                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
382
383                 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
384                         Data:           []byte("foo"),
385                         StorageClasses: trial.putClasses,
386                 })
387                 if trial.success {
388                         c.Check(err, IsNil)
389                 } else {
390                         c.Check(err, NotNil)
391                 }
392                 c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
393                 c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
394                 if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, Equals, true) {
395                         // Max concurrency should be 1. First request
396                         // should have succeeded for class1. Second
397                         // request should only ask for class404.
398                         c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), Equals, "class404")
399                 }
400         }
401 }
402
403 type FailHandler struct {
404         handled chan string
405 }
406
407 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
408         resp.WriteHeader(500)
409         fh.handled <- fmt.Sprintf("http://%s", req.Host)
410 }
411
412 type FailThenSucceedHandler struct {
413         handled        chan string
414         count          int
415         successhandler http.Handler
416         reqIDs         []string
417 }
418
419 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
420         fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
421         if fh.count == 0 {
422                 resp.WriteHeader(500)
423                 fh.count++
424                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
425         } else {
426                 fh.successhandler.ServeHTTP(resp, req)
427         }
428 }
429
430 type Error404Handler struct {
431         handled chan string
432 }
433
434 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
435         resp.WriteHeader(404)
436         fh.handled <- fmt.Sprintf("http://%s", req.Host)
437 }
438
439 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
440         st := FailHandler{
441                 make(chan string)}
442
443         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
444
445         UploadToStubHelper(c, st,
446                 func(kc *KeepClient, url string, reader io.ReadCloser,
447                         writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
448
449                         go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
450
451                         writer.Write([]byte("foo"))
452                         writer.Close()
453
454                         <-st.handled
455
456                         status := <-uploadStatusChan
457                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
458                         c.Check(status.statusCode, Equals, 500)
459                 })
460 }
461
462 type KeepServer struct {
463         listener net.Listener
464         url      string
465 }
466
467 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
468         ks = make([]KeepServer, n)
469
470         for i := 0; i < n; i++ {
471                 ks[i] = RunFakeKeepServer(st)
472         }
473
474         return ks
475 }
476
477 func (s *StandaloneSuite) TestPutB(c *C) {
478         hash := Md5String("foo")
479
480         st := &StubPutHandler{
481                 c:                    c,
482                 expectPath:           hash,
483                 expectAPIToken:       "abc123",
484                 expectBody:           "foo",
485                 expectStorageClass:   "default",
486                 returnStorageClasses: "",
487                 handled:              make(chan string, 5),
488         }
489
490         arv, _ := arvadosclient.MakeArvadosClient()
491         kc, _ := MakeKeepClient(arv)
492
493         kc.Want_replicas = 2
494         arv.ApiToken = "abc123"
495         localRoots := make(map[string]string)
496         writableLocalRoots := make(map[string]string)
497
498         ks := RunSomeFakeKeepServers(st, 5)
499
500         for i, k := range ks {
501                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
502                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
503                 defer k.listener.Close()
504         }
505
506         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
507
508         kc.PutB([]byte("foo"))
509
510         shuff := NewRootSorter(
511                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
512
513         s1 := <-st.handled
514         s2 := <-st.handled
515         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
516                 (s1 == shuff[1] && s2 == shuff[0]),
517                 Equals,
518                 true)
519 }
520
521 func (s *StandaloneSuite) TestPutHR(c *C) {
522         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
523
524         st := &StubPutHandler{
525                 c:                    c,
526                 expectPath:           hash,
527                 expectAPIToken:       "abc123",
528                 expectBody:           "foo",
529                 expectStorageClass:   "default",
530                 returnStorageClasses: "",
531                 handled:              make(chan string, 5),
532         }
533
534         arv, _ := arvadosclient.MakeArvadosClient()
535         kc, _ := MakeKeepClient(arv)
536
537         kc.Want_replicas = 2
538         arv.ApiToken = "abc123"
539         localRoots := make(map[string]string)
540         writableLocalRoots := make(map[string]string)
541
542         ks := RunSomeFakeKeepServers(st, 5)
543
544         for i, k := range ks {
545                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
546                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
547                 defer k.listener.Close()
548         }
549
550         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
551
552         reader, writer := io.Pipe()
553
554         go func() {
555                 writer.Write([]byte("foo"))
556                 writer.Close()
557         }()
558
559         kc.PutHR(hash, reader, 3)
560
561         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
562
563         s1 := <-st.handled
564         s2 := <-st.handled
565
566         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
567                 (s1 == shuff[1] && s2 == shuff[0]),
568                 Equals,
569                 true)
570 }
571
572 func (s *StandaloneSuite) TestPutWithFail(c *C) {
573         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
574
575         st := &StubPutHandler{
576                 c:                    c,
577                 expectPath:           hash,
578                 expectAPIToken:       "abc123",
579                 expectBody:           "foo",
580                 expectStorageClass:   "default",
581                 returnStorageClasses: "",
582                 handled:              make(chan string, 4),
583         }
584
585         fh := FailHandler{
586                 make(chan string, 1)}
587
588         arv, err := arvadosclient.MakeArvadosClient()
589         c.Check(err, IsNil)
590         kc, _ := MakeKeepClient(arv)
591
592         kc.Want_replicas = 2
593         arv.ApiToken = "abc123"
594         localRoots := make(map[string]string)
595         writableLocalRoots := make(map[string]string)
596
597         ks1 := RunSomeFakeKeepServers(st, 4)
598         ks2 := RunSomeFakeKeepServers(fh, 1)
599
600         for i, k := range ks1 {
601                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
602                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
603                 defer k.listener.Close()
604         }
605         for i, k := range ks2 {
606                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
607                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
608                 defer k.listener.Close()
609         }
610
611         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
612
613         shuff := NewRootSorter(
614                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
615         c.Logf("%+v", shuff)
616
617         phash, replicas, err := kc.PutB([]byte("foo"))
618
619         <-fh.handled
620
621         c.Check(err, Equals, nil)
622         c.Check(phash, Equals, "")
623         c.Check(replicas, Equals, 2)
624
625         s1 := <-st.handled
626         s2 := <-st.handled
627
628         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
629                 (s1 == shuff[2] && s2 == shuff[1]),
630                 Equals,
631                 true)
632 }
633
634 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
635         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
636
637         st := &StubPutHandler{
638                 c:                    c,
639                 expectPath:           hash,
640                 expectAPIToken:       "abc123",
641                 expectBody:           "foo",
642                 expectStorageClass:   "default",
643                 returnStorageClasses: "",
644                 handled:              make(chan string, 1),
645         }
646
647         fh := FailHandler{
648                 make(chan string, 4)}
649
650         arv, err := arvadosclient.MakeArvadosClient()
651         c.Check(err, IsNil)
652         kc, _ := MakeKeepClient(arv)
653
654         kc.Want_replicas = 2
655         kc.Retries = 0
656         arv.ApiToken = "abc123"
657         localRoots := make(map[string]string)
658         writableLocalRoots := make(map[string]string)
659
660         ks1 := RunSomeFakeKeepServers(st, 1)
661         ks2 := RunSomeFakeKeepServers(fh, 4)
662
663         for i, k := range ks1 {
664                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
665                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
666                 defer k.listener.Close()
667         }
668         for i, k := range ks2 {
669                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
670                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
671                 defer k.listener.Close()
672         }
673
674         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
675
676         _, replicas, err := kc.PutB([]byte("foo"))
677
678         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
679         c.Check(replicas, Equals, 1)
680         c.Check(<-st.handled, Equals, ks1[0].url)
681 }
682
683 type StubGetHandler struct {
684         c              *C
685         expectPath     string
686         expectAPIToken string
687         httpStatus     int
688         body           []byte
689 }
690
691 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
692         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
693         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken))
694         resp.WriteHeader(sgh.httpStatus)
695         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
696         resp.Write(sgh.body)
697 }
698
699 func (s *StandaloneSuite) TestGet(c *C) {
700         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
701
702         st := StubGetHandler{
703                 c,
704                 hash,
705                 "abc123",
706                 http.StatusOK,
707                 []byte("foo")}
708
709         ks := RunFakeKeepServer(st)
710         defer ks.listener.Close()
711
712         arv, err := arvadosclient.MakeArvadosClient()
713         c.Check(err, IsNil)
714         kc, _ := MakeKeepClient(arv)
715         arv.ApiToken = "abc123"
716         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
717
718         r, n, url2, err := kc.Get(hash)
719         defer r.Close()
720         c.Check(err, Equals, nil)
721         c.Check(n, Equals, int64(3))
722         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
723
724         content, err2 := ioutil.ReadAll(r)
725         c.Check(err2, Equals, nil)
726         c.Check(content, DeepEquals, []byte("foo"))
727 }
728
729 func (s *StandaloneSuite) TestGet404(c *C) {
730         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
731
732         st := Error404Handler{make(chan string, 1)}
733
734         ks := RunFakeKeepServer(st)
735         defer ks.listener.Close()
736
737         arv, err := arvadosclient.MakeArvadosClient()
738         c.Check(err, IsNil)
739         kc, _ := MakeKeepClient(arv)
740         arv.ApiToken = "abc123"
741         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
742
743         r, n, url2, err := kc.Get(hash)
744         c.Check(err, Equals, BlockNotFound)
745         c.Check(n, Equals, int64(0))
746         c.Check(url2, Equals, "")
747         c.Check(r, Equals, nil)
748 }
749
750 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
751         st := Error404Handler{make(chan string, 1)}
752
753         ks := RunFakeKeepServer(st)
754         defer ks.listener.Close()
755
756         arv, err := arvadosclient.MakeArvadosClient()
757         c.Check(err, IsNil)
758         kc, _ := MakeKeepClient(arv)
759         arv.ApiToken = "abc123"
760         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
761
762         r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
763         c.Check(err, IsNil)
764         c.Check(n, Equals, int64(0))
765         c.Check(url2, Equals, "")
766         c.Assert(r, NotNil)
767         buf, err := ioutil.ReadAll(r)
768         c.Check(err, IsNil)
769         c.Check(buf, DeepEquals, []byte{})
770 }
771
772 func (s *StandaloneSuite) TestGetFail(c *C) {
773         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
774
775         st := FailHandler{make(chan string, 1)}
776
777         ks := RunFakeKeepServer(st)
778         defer ks.listener.Close()
779
780         arv, err := arvadosclient.MakeArvadosClient()
781         c.Check(err, IsNil)
782         kc, _ := MakeKeepClient(arv)
783         arv.ApiToken = "abc123"
784         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
785         kc.Retries = 0
786
787         r, n, url2, err := kc.Get(hash)
788         errNotFound, _ := err.(*ErrNotFound)
789         c.Check(errNotFound, NotNil)
790         c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
791         c.Check(errNotFound.Temporary(), Equals, true)
792         c.Check(n, Equals, int64(0))
793         c.Check(url2, Equals, "")
794         c.Check(r, Equals, nil)
795 }
796
797 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
798         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
799
800         st := &FailThenSucceedHandler{
801                 handled: make(chan string, 1),
802                 successhandler: StubGetHandler{
803                         c,
804                         hash,
805                         "abc123",
806                         http.StatusOK,
807                         []byte("foo")}}
808
809         ks := RunFakeKeepServer(st)
810         defer ks.listener.Close()
811
812         arv, err := arvadosclient.MakeArvadosClient()
813         c.Check(err, IsNil)
814         kc, _ := MakeKeepClient(arv)
815         arv.ApiToken = "abc123"
816         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
817
818         r, n, url2, err := kc.Get(hash)
819         defer r.Close()
820         c.Check(err, Equals, nil)
821         c.Check(n, Equals, int64(3))
822         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
823
824         content, err2 := ioutil.ReadAll(r)
825         c.Check(err2, Equals, nil)
826         c.Check(content, DeepEquals, []byte("foo"))
827
828         c.Logf("%q", st.reqIDs)
829         c.Assert(len(st.reqIDs) > 1, Equals, true)
830         for _, reqid := range st.reqIDs {
831                 c.Check(reqid, Not(Equals), "")
832                 c.Check(reqid, Equals, st.reqIDs[0])
833         }
834 }
835
836 func (s *StandaloneSuite) TestGetNetError(c *C) {
837         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
838
839         arv, err := arvadosclient.MakeArvadosClient()
840         c.Check(err, IsNil)
841         kc, _ := MakeKeepClient(arv)
842         arv.ApiToken = "abc123"
843         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
844
845         r, n, url2, err := kc.Get(hash)
846         errNotFound, _ := err.(*ErrNotFound)
847         c.Check(errNotFound, NotNil)
848         c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
849         c.Check(errNotFound.Temporary(), Equals, true)
850         c.Check(n, Equals, int64(0))
851         c.Check(url2, Equals, "")
852         c.Check(r, Equals, nil)
853 }
854
855 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
856         uuid := "zzzzz-bi6l4-123451234512345"
857         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
858
859         // This one shouldn't be used:
860         ks0 := RunFakeKeepServer(StubGetHandler{
861                 c,
862                 "error if used",
863                 "abc123",
864                 http.StatusOK,
865                 []byte("foo")})
866         defer ks0.listener.Close()
867         // This one should be used:
868         ks := RunFakeKeepServer(StubGetHandler{
869                 c,
870                 hash + "+K@" + uuid,
871                 "abc123",
872                 http.StatusOK,
873                 []byte("foo")})
874         defer ks.listener.Close()
875
876         arv, err := arvadosclient.MakeArvadosClient()
877         c.Check(err, IsNil)
878         kc, _ := MakeKeepClient(arv)
879         arv.ApiToken = "abc123"
880         kc.SetServiceRoots(
881                 map[string]string{"x": ks0.url},
882                 nil,
883                 map[string]string{uuid: ks.url})
884
885         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
886         defer r.Close()
887         c.Check(err, Equals, nil)
888         c.Check(n, Equals, int64(3))
889         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
890
891         content, err := ioutil.ReadAll(r)
892         c.Check(err, Equals, nil)
893         c.Check(content, DeepEquals, []byte("foo"))
894 }
895
896 // Use a service hint to fetch from a local disk service, overriding
897 // rendezvous probe order.
898 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
899         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
900         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
901
902         // This one shouldn't be used, although it appears first in
903         // rendezvous probe order:
904         ks0 := RunFakeKeepServer(StubGetHandler{
905                 c,
906                 "error if used",
907                 "abc123",
908                 http.StatusOK,
909                 []byte("foo")})
910         defer ks0.listener.Close()
911         // This one should be used:
912         ks := RunFakeKeepServer(StubGetHandler{
913                 c,
914                 hash + "+K@" + uuid,
915                 "abc123",
916                 http.StatusOK,
917                 []byte("foo")})
918         defer ks.listener.Close()
919
920         arv, err := arvadosclient.MakeArvadosClient()
921         c.Check(err, IsNil)
922         kc, _ := MakeKeepClient(arv)
923         arv.ApiToken = "abc123"
924         kc.SetServiceRoots(
925                 map[string]string{
926                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
927                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
928                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
929                         uuid:                          ks.url},
930                 nil,
931                 map[string]string{
932                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
933                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
934                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
935                         uuid:                          ks.url},
936         )
937
938         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
939         defer r.Close()
940         c.Check(err, Equals, nil)
941         c.Check(n, Equals, int64(3))
942         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
943
944         content, err := ioutil.ReadAll(r)
945         c.Check(err, Equals, nil)
946         c.Check(content, DeepEquals, []byte("foo"))
947 }
948
949 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
950         uuid := "zzzzz-bi6l4-123451234512345"
951         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
952
953         ksLocal := RunFakeKeepServer(StubGetHandler{
954                 c,
955                 hash + "+K@" + uuid,
956                 "abc123",
957                 http.StatusOK,
958                 []byte("foo")})
959         defer ksLocal.listener.Close()
960         ksGateway := RunFakeKeepServer(StubGetHandler{
961                 c,
962                 hash + "+K@" + uuid,
963                 "abc123",
964                 http.StatusInternalServerError,
965                 []byte("Error")})
966         defer ksGateway.listener.Close()
967
968         arv, err := arvadosclient.MakeArvadosClient()
969         c.Check(err, IsNil)
970         kc, _ := MakeKeepClient(arv)
971         arv.ApiToken = "abc123"
972         kc.SetServiceRoots(
973                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
974                 nil,
975                 map[string]string{uuid: ksGateway.url})
976
977         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
978         c.Assert(err, Equals, nil)
979         defer r.Close()
980         c.Check(n, Equals, int64(3))
981         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
982
983         content, err := ioutil.ReadAll(r)
984         c.Check(err, Equals, nil)
985         c.Check(content, DeepEquals, []byte("foo"))
986 }
987
988 type BarHandler struct {
989         handled chan string
990 }
991
992 func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
993         resp.Write([]byte("bar"))
994         h.handled <- fmt.Sprintf("http://%s", req.Host)
995 }
996
997 func (s *StandaloneSuite) TestChecksum(c *C) {
998         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
999         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
1000
1001         st := BarHandler{make(chan string, 1)}
1002
1003         ks := RunFakeKeepServer(st)
1004         defer ks.listener.Close()
1005
1006         arv, err := arvadosclient.MakeArvadosClient()
1007         c.Check(err, IsNil)
1008         kc, _ := MakeKeepClient(arv)
1009         arv.ApiToken = "abc123"
1010         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1011
1012         r, n, _, err := kc.Get(barhash)
1013         c.Check(err, IsNil)
1014         _, err = ioutil.ReadAll(r)
1015         c.Check(n, Equals, int64(3))
1016         c.Check(err, Equals, nil)
1017
1018         <-st.handled
1019
1020         r, n, _, err = kc.Get(foohash)
1021         c.Check(err, IsNil)
1022         _, err = ioutil.ReadAll(r)
1023         c.Check(n, Equals, int64(3))
1024         c.Check(err, Equals, BadChecksum)
1025
1026         <-st.handled
1027 }
1028
1029 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
1030         content := []byte("waz")
1031         hash := fmt.Sprintf("%x", md5.Sum(content))
1032
1033         fh := Error404Handler{
1034                 make(chan string, 4)}
1035
1036         st := StubGetHandler{
1037                 c,
1038                 hash,
1039                 "abc123",
1040                 http.StatusOK,
1041                 content}
1042
1043         arv, err := arvadosclient.MakeArvadosClient()
1044         c.Check(err, IsNil)
1045         kc, _ := MakeKeepClient(arv)
1046         arv.ApiToken = "abc123"
1047         localRoots := make(map[string]string)
1048         writableLocalRoots := make(map[string]string)
1049
1050         ks1 := RunSomeFakeKeepServers(st, 1)
1051         ks2 := RunSomeFakeKeepServers(fh, 4)
1052
1053         for i, k := range ks1 {
1054                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1055                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1056                 defer k.listener.Close()
1057         }
1058         for i, k := range ks2 {
1059                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
1060                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
1061                 defer k.listener.Close()
1062         }
1063
1064         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1065         kc.Retries = 0
1066
1067         // This test works only if one of the failing services is
1068         // attempted before the succeeding service. Otherwise,
1069         // <-fh.handled below will just hang! (Probe order depends on
1070         // the choice of block content "waz" and the UUIDs of the fake
1071         // servers, so we just tried different strings until we found
1072         // an example that passes this Assert.)
1073         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
1074
1075         r, n, url2, err := kc.Get(hash)
1076
1077         <-fh.handled
1078         c.Check(err, Equals, nil)
1079         c.Check(n, Equals, int64(3))
1080         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
1081
1082         readContent, err2 := ioutil.ReadAll(r)
1083         c.Check(err2, Equals, nil)
1084         c.Check(readContent, DeepEquals, content)
1085 }
1086
1087 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
1088         content := []byte("TestPutGetHead")
1089
1090         arv, err := arvadosclient.MakeArvadosClient()
1091         c.Check(err, IsNil)
1092         kc, err := MakeKeepClient(arv)
1093         c.Assert(err, Equals, nil)
1094
1095         hash := fmt.Sprintf("%x", md5.Sum(content))
1096
1097         {
1098                 n, _, err := kc.Ask(hash)
1099                 c.Check(err, Equals, BlockNotFound)
1100                 c.Check(n, Equals, int64(0))
1101         }
1102         {
1103                 hash2, replicas, err := kc.PutB(content)
1104                 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
1105                 c.Check(replicas, Equals, 2)
1106                 c.Check(err, Equals, nil)
1107         }
1108         {
1109                 r, n, url2, err := kc.Get(hash)
1110                 c.Check(err, Equals, nil)
1111                 c.Check(n, Equals, int64(len(content)))
1112                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
1113
1114                 readContent, err2 := ioutil.ReadAll(r)
1115                 c.Check(err2, Equals, nil)
1116                 c.Check(readContent, DeepEquals, content)
1117         }
1118         {
1119                 n, url2, err := kc.Ask(hash)
1120                 c.Check(err, Equals, nil)
1121                 c.Check(n, Equals, int64(len(content)))
1122                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
1123         }
1124         {
1125                 loc, err := kc.LocalLocator(hash)
1126                 c.Check(err, Equals, nil)
1127                 c.Assert(len(loc) >= 32, Equals, true)
1128                 c.Check(loc[:32], Equals, hash[:32])
1129         }
1130         {
1131                 content := []byte("the perth county conspiracy")
1132                 loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
1133                 c.Check(loc, Equals, "")
1134                 c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
1135                 c.Check(err, ErrorMatches, `.*HTTP 400.*`)
1136         }
1137 }
1138
1139 type StubProxyHandler struct {
1140         handled chan string
1141 }
1142
1143 func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1144         resp.Header().Set("X-Keep-Replicas-Stored", "2")
1145         h.handled <- fmt.Sprintf("http://%s", req.Host)
1146 }
1147
1148 func (s *StandaloneSuite) TestPutProxy(c *C) {
1149         st := StubProxyHandler{make(chan string, 1)}
1150
1151         arv, err := arvadosclient.MakeArvadosClient()
1152         c.Check(err, IsNil)
1153         kc, _ := MakeKeepClient(arv)
1154
1155         kc.Want_replicas = 2
1156         arv.ApiToken = "abc123"
1157         localRoots := make(map[string]string)
1158         writableLocalRoots := make(map[string]string)
1159
1160         ks1 := RunSomeFakeKeepServers(st, 1)
1161
1162         for i, k := range ks1 {
1163                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1164                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1165                 defer k.listener.Close()
1166         }
1167
1168         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1169
1170         _, replicas, err := kc.PutB([]byte("foo"))
1171         <-st.handled
1172
1173         c.Check(err, Equals, nil)
1174         c.Check(replicas, Equals, 2)
1175 }
1176
1177 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
1178         st := StubProxyHandler{make(chan string, 1)}
1179
1180         arv, err := arvadosclient.MakeArvadosClient()
1181         c.Check(err, IsNil)
1182         kc, _ := MakeKeepClient(arv)
1183
1184         kc.Want_replicas = 3
1185         arv.ApiToken = "abc123"
1186         localRoots := make(map[string]string)
1187         writableLocalRoots := make(map[string]string)
1188
1189         ks1 := RunSomeFakeKeepServers(st, 1)
1190
1191         for i, k := range ks1 {
1192                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1193                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1194                 defer k.listener.Close()
1195         }
1196         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1197
1198         _, replicas, err := kc.PutB([]byte("foo"))
1199         <-st.handled
1200
1201         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1202         c.Check(replicas, Equals, 2)
1203 }
1204
1205 func (s *StandaloneSuite) TestMakeLocator(c *C) {
1206         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
1207         c.Check(err, Equals, nil)
1208         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1209         c.Check(l.Size, Equals, 3)
1210         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
1211 }
1212
1213 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
1214         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
1215         c.Check(err, Equals, nil)
1216         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1217         c.Check(l.Size, Equals, -1)
1218         c.Check(l.Hints, DeepEquals, []string{})
1219 }
1220
1221 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
1222         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
1223         c.Check(err, Equals, nil)
1224         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1225         c.Check(l.Size, Equals, -1)
1226         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
1227 }
1228
1229 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
1230         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
1231         l, err := MakeLocator(str)
1232         c.Check(err, Equals, nil)
1233         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1234         c.Check(l.Size, Equals, 3)
1235         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
1236         c.Check(l.String(), Equals, str)
1237 }
1238
1239 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
1240         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
1241         c.Check(err, Equals, InvalidLocatorError)
1242 }
1243
1244 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
1245         hash := Md5String("foo")
1246
1247         st := &StubPutHandler{
1248                 c:                    c,
1249                 expectPath:           hash,
1250                 expectAPIToken:       "abc123",
1251                 expectBody:           "foo",
1252                 expectStorageClass:   "default",
1253                 returnStorageClasses: "",
1254                 handled:              make(chan string, 5),
1255         }
1256
1257         arv, _ := arvadosclient.MakeArvadosClient()
1258         kc, _ := MakeKeepClient(arv)
1259
1260         kc.Want_replicas = 2
1261         arv.ApiToken = "abc123"
1262         localRoots := make(map[string]string)
1263         writableLocalRoots := make(map[string]string)
1264
1265         ks := RunSomeFakeKeepServers(st, 5)
1266
1267         for i, k := range ks {
1268                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1269                 if i == 0 {
1270                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1271                 }
1272                 defer k.listener.Close()
1273         }
1274
1275         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1276
1277         _, replicas, err := kc.PutB([]byte("foo"))
1278
1279         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1280         c.Check(replicas, Equals, 1)
1281
1282         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1283 }
1284
1285 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1286         hash := Md5String("foo")
1287
1288         st := &StubPutHandler{
1289                 c:                    c,
1290                 expectPath:           hash,
1291                 expectAPIToken:       "abc123",
1292                 expectBody:           "foo",
1293                 expectStorageClass:   "",
1294                 returnStorageClasses: "",
1295                 handled:              make(chan string, 5),
1296         }
1297
1298         arv, _ := arvadosclient.MakeArvadosClient()
1299         kc, _ := MakeKeepClient(arv)
1300
1301         kc.Want_replicas = 2
1302         arv.ApiToken = "abc123"
1303         localRoots := make(map[string]string)
1304         writableLocalRoots := make(map[string]string)
1305
1306         ks := RunSomeFakeKeepServers(st, 5)
1307
1308         for i, k := range ks {
1309                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1310                 defer k.listener.Close()
1311         }
1312
1313         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1314
1315         _, replicas, err := kc.PutB([]byte("foo"))
1316
1317         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1318         c.Check(replicas, Equals, 0)
1319 }
1320
1321 type StubGetIndexHandler struct {
1322         c              *C
1323         expectPath     string
1324         expectAPIToken string
1325         httpStatus     int
1326         body           []byte
1327 }
1328
1329 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1330         h.c.Check(req.URL.Path, Equals, h.expectPath)
1331         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1332         resp.WriteHeader(h.httpStatus)
1333         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1334         resp.Write(h.body)
1335 }
1336
1337 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1338         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1339
1340         st := StubGetIndexHandler{
1341                 c,
1342                 "/index",
1343                 "abc123",
1344                 http.StatusOK,
1345                 []byte(hash + "+3 1443559274\n\n")}
1346
1347         ks := RunFakeKeepServer(st)
1348         defer ks.listener.Close()
1349
1350         arv, err := arvadosclient.MakeArvadosClient()
1351         c.Assert(err, IsNil)
1352         kc, err := MakeKeepClient(arv)
1353         c.Assert(err, IsNil)
1354         arv.ApiToken = "abc123"
1355         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1356
1357         r, err := kc.GetIndex("x", "")
1358         c.Check(err, IsNil)
1359
1360         content, err2 := ioutil.ReadAll(r)
1361         c.Check(err2, Equals, nil)
1362         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1363 }
1364
1365 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1366         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1367
1368         st := StubGetIndexHandler{
1369                 c,
1370                 "/index/" + hash[0:3],
1371                 "abc123",
1372                 http.StatusOK,
1373                 []byte(hash + "+3 1443559274\n\n")}
1374
1375         ks := RunFakeKeepServer(st)
1376         defer ks.listener.Close()
1377
1378         arv, err := arvadosclient.MakeArvadosClient()
1379         c.Check(err, IsNil)
1380         kc, _ := MakeKeepClient(arv)
1381         arv.ApiToken = "abc123"
1382         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1383
1384         r, err := kc.GetIndex("x", hash[0:3])
1385         c.Assert(err, Equals, nil)
1386
1387         content, err2 := ioutil.ReadAll(r)
1388         c.Check(err2, Equals, nil)
1389         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1390 }
1391
1392 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1393         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1394
1395         st := StubGetIndexHandler{
1396                 c,
1397                 "/index/" + hash[0:3],
1398                 "abc123",
1399                 http.StatusOK,
1400                 []byte(hash)}
1401
1402         ks := RunFakeKeepServer(st)
1403         defer ks.listener.Close()
1404
1405         arv, err := arvadosclient.MakeArvadosClient()
1406         c.Check(err, IsNil)
1407         kc, _ := MakeKeepClient(arv)
1408         arv.ApiToken = "abc123"
1409         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1410
1411         _, err = kc.GetIndex("x", hash[0:3])
1412         c.Check(err, Equals, ErrIncompleteIndex)
1413 }
1414
1415 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1416         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1417
1418         st := StubGetIndexHandler{
1419                 c,
1420                 "/index/" + hash[0:3],
1421                 "abc123",
1422                 http.StatusOK,
1423                 []byte(hash)}
1424
1425         ks := RunFakeKeepServer(st)
1426         defer ks.listener.Close()
1427
1428         arv, err := arvadosclient.MakeArvadosClient()
1429         c.Check(err, IsNil)
1430         kc, _ := MakeKeepClient(arv)
1431         arv.ApiToken = "abc123"
1432         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1433
1434         _, err = kc.GetIndex("y", hash[0:3])
1435         c.Check(err, Equals, ErrNoSuchKeepServer)
1436 }
1437
1438 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1439         st := StubGetIndexHandler{
1440                 c,
1441                 "/index/abcd",
1442                 "abc123",
1443                 http.StatusOK,
1444                 []byte("\n")}
1445
1446         ks := RunFakeKeepServer(st)
1447         defer ks.listener.Close()
1448
1449         arv, err := arvadosclient.MakeArvadosClient()
1450         c.Check(err, IsNil)
1451         kc, _ := MakeKeepClient(arv)
1452         arv.ApiToken = "abc123"
1453         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1454
1455         r, err := kc.GetIndex("x", "abcd")
1456         c.Check(err, Equals, nil)
1457
1458         content, err2 := ioutil.ReadAll(r)
1459         c.Check(err2, Equals, nil)
1460         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1461 }
1462
1463 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1464         st := &FailThenSucceedHandler{
1465                 handled: make(chan string, 1),
1466                 successhandler: &StubPutHandler{
1467                         c:                    c,
1468                         expectPath:           Md5String("foo"),
1469                         expectAPIToken:       "abc123",
1470                         expectBody:           "foo",
1471                         expectStorageClass:   "default",
1472                         returnStorageClasses: "",
1473                         handled:              make(chan string, 5),
1474                 },
1475         }
1476
1477         arv, _ := arvadosclient.MakeArvadosClient()
1478         kc, _ := MakeKeepClient(arv)
1479
1480         kc.Want_replicas = 2
1481         arv.ApiToken = "abc123"
1482         localRoots := make(map[string]string)
1483         writableLocalRoots := make(map[string]string)
1484
1485         ks := RunSomeFakeKeepServers(st, 2)
1486
1487         for i, k := range ks {
1488                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1489                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1490                 defer k.listener.Close()
1491         }
1492
1493         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1494
1495         hash, replicas, err := kc.PutB([]byte("foo"))
1496
1497         c.Check(err, Equals, nil)
1498         c.Check(hash, Equals, "")
1499         c.Check(replicas, Equals, 2)
1500 }
1501
1502 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1503         arv, err := arvadosclient.MakeArvadosClient()
1504         c.Assert(err, Equals, nil)
1505
1506         // Add an additional "testblobstore" keepservice
1507         blobKeepService := make(arvadosclient.Dict)
1508         err = arv.Create("keep_services",
1509                 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1510                         "service_host": "localhost",
1511                         "service_port": "21321",
1512                         "service_type": "testblobstore"}},
1513                 &blobKeepService)
1514         c.Assert(err, Equals, nil)
1515         defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1516         RefreshServiceDiscovery()
1517
1518         // Make a keepclient and ensure that the testblobstore is included
1519         kc, err := MakeKeepClient(arv)
1520         c.Assert(err, Equals, nil)
1521
1522         // verify kc.LocalRoots
1523         c.Check(len(kc.LocalRoots()), Equals, 3)
1524         for _, root := range kc.LocalRoots() {
1525                 c.Check(root, Matches, "http://localhost:\\d+")
1526         }
1527         c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1528
1529         // verify kc.GatewayRoots
1530         c.Check(len(kc.GatewayRoots()), Equals, 3)
1531         for _, root := range kc.GatewayRoots() {
1532                 c.Check(root, Matches, "http://localhost:\\d+")
1533         }
1534         c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1535
1536         // verify kc.WritableLocalRoots
1537         c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1538         for _, root := range kc.WritableLocalRoots() {
1539                 c.Check(root, Matches, "http://localhost:\\d+")
1540         }
1541         c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1542
1543         c.Assert(kc.replicasPerService, Equals, 0)
1544         c.Assert(kc.foundNonDiskSvc, Equals, true)
1545         c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
1546 }