Merge branch '15317-metrics'
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "net"
16         "net/http"
17         "os"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "testing"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
26         "git.arvados.org/arvados.git/sdk/go/arvadostest"
27         . "gopkg.in/check.v1"
28 )
29
30 func Test(t *testing.T) {
31         DefaultRetryDelay = 50 * time.Millisecond
32         TestingT(t)
33 }
34
35 // Gocheck boilerplate
36 var _ = Suite(&ServerRequiredSuite{})
37 var _ = Suite(&StandaloneSuite{})
38
39 // Tests that require the Keep server running
40 type ServerRequiredSuite struct{}
41
42 // Standalone tests
43 type StandaloneSuite struct {
44         origDefaultRetryDelay time.Duration
45         origMinimumRetryDelay time.Duration
46 }
47
48 var origHOME = os.Getenv("HOME")
49
50 func (s *StandaloneSuite) SetUpTest(c *C) {
51         RefreshServiceDiscovery()
52         // Prevent cache state from leaking between test cases
53         os.Setenv("HOME", c.MkDir())
54         s.origDefaultRetryDelay = DefaultRetryDelay
55         s.origMinimumRetryDelay = MinimumRetryDelay
56 }
57
58 func (s *StandaloneSuite) TearDownTest(c *C) {
59         os.Setenv("HOME", origHOME)
60         DefaultRetryDelay = s.origDefaultRetryDelay
61         MinimumRetryDelay = s.origMinimumRetryDelay
62 }
63
64 func pythonDir() string {
65         cwd, _ := os.Getwd()
66         return fmt.Sprintf("%s/../../python/tests", cwd)
67 }
68
69 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
70         arvadostest.StartKeep(2, false)
71 }
72
73 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
74         arvadostest.StopKeep(2)
75         os.Setenv("HOME", origHOME)
76 }
77
78 func (s *ServerRequiredSuite) SetUpTest(c *C) {
79         RefreshServiceDiscovery()
80         // Prevent cache state from leaking between test cases
81         os.Setenv("HOME", c.MkDir())
82 }
83
84 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
85         arv, err := arvadosclient.MakeArvadosClient()
86         c.Assert(err, IsNil)
87
88         kc, err := MakeKeepClient(arv)
89
90         c.Assert(err, IsNil)
91         c.Check(len(kc.LocalRoots()), Equals, 2)
92         for _, root := range kc.LocalRoots() {
93                 c.Check(root, Matches, "http://localhost:\\d+")
94         }
95 }
96
97 func (s *ServerRequiredSuite) TestDefaultStorageClasses(c *C) {
98         arv, err := arvadosclient.MakeArvadosClient()
99         c.Assert(err, IsNil)
100
101         cc, err := arv.ClusterConfig("StorageClasses")
102         c.Assert(err, IsNil)
103         c.Assert(cc, NotNil)
104         c.Assert(cc.(map[string]interface{})["default"], NotNil)
105
106         kc := New(arv)
107         c.Assert(kc.DefaultStorageClasses, DeepEquals, []string{"default"})
108 }
109
110 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
111         arv, err := arvadosclient.MakeArvadosClient()
112         c.Assert(err, IsNil)
113
114         kc, err := MakeKeepClient(arv)
115         c.Check(err, IsNil)
116         c.Assert(kc.Want_replicas, Equals, 2)
117
118         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
119         kc, err = MakeKeepClient(arv)
120         c.Check(err, IsNil)
121         c.Assert(kc.Want_replicas, Equals, 3)
122
123         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
124         kc, err = MakeKeepClient(arv)
125         c.Check(err, IsNil)
126         c.Assert(kc.Want_replicas, Equals, 1)
127 }
128
129 type StubPutHandler struct {
130         c                    *C
131         expectPath           string
132         expectAPIToken       string
133         expectBody           string
134         expectStorageClass   string
135         returnStorageClasses string
136         handled              chan string
137         requests             []*http.Request
138         mtx                  sync.Mutex
139 }
140
141 func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
142         sph.mtx.Lock()
143         sph.requests = append(sph.requests, req)
144         sph.mtx.Unlock()
145         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
146         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken))
147         if sph.expectStorageClass != "*" {
148                 sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
149         }
150         body, err := ioutil.ReadAll(req.Body)
151         sph.c.Check(err, IsNil)
152         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
153         resp.Header().Set("X-Keep-Replicas-Stored", "1")
154         if sph.returnStorageClasses != "" {
155                 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
156         }
157         resp.WriteHeader(200)
158         sph.handled <- fmt.Sprintf("http://%s", req.Host)
159 }
160
161 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
162         var err error
163         // If we don't explicitly bind it to localhost, ks.listener.Addr() will
164         // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
165         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
166         if err != nil {
167                 panic("Could not listen on any port")
168         }
169         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
170         go http.Serve(ks.listener, st)
171         return
172 }
173
174 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
175         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
176
177         ks := RunFakeKeepServer(st)
178         defer ks.listener.Close()
179
180         arv, _ := arvadosclient.MakeArvadosClient()
181         arv.ApiToken = "abc123"
182
183         kc, _ := MakeKeepClient(arv)
184
185         reader, writer := io.Pipe()
186         uploadStatusChan := make(chan uploadStatus)
187
188         f(kc, ks.url, reader, writer, uploadStatusChan)
189 }
190
191 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
192         log.Printf("TestUploadToStubKeepServer")
193
194         st := &StubPutHandler{
195                 c:                    c,
196                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
197                 expectAPIToken:       "abc123",
198                 expectBody:           "foo",
199                 expectStorageClass:   "",
200                 returnStorageClasses: "default=1",
201                 handled:              make(chan string),
202         }
203
204         UploadToStubHelper(c, st,
205                 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
206                         go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
207
208                         writer.Write([]byte("foo"))
209                         writer.Close()
210
211                         <-st.handled
212                         status := <-uploadStatusChan
213                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
214                 })
215 }
216
217 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
218         st := &StubPutHandler{
219                 c:                    c,
220                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
221                 expectAPIToken:       "abc123",
222                 expectBody:           "foo",
223                 expectStorageClass:   "",
224                 returnStorageClasses: "default=1",
225                 handled:              make(chan string),
226         }
227
228         UploadToStubHelper(c, st,
229                 func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
230                         go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
231
232                         <-st.handled
233
234                         status := <-uploadStatusChan
235                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
236                 })
237 }
238
239 func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
240         for _, trial := range []struct {
241                 respHeader string
242                 expectMap  map[string]int
243         }{
244                 {"", nil},
245                 {"foo=1", map[string]int{"foo": 1}},
246                 {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
247                 {" =foo=1 ", nil},
248                 {"foo", nil},
249         } {
250                 st := &StubPutHandler{
251                         c:                    c,
252                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
253                         expectAPIToken:       "abc123",
254                         expectBody:           "foo",
255                         expectStorageClass:   "",
256                         returnStorageClasses: trial.respHeader,
257                         handled:              make(chan string),
258                 }
259
260                 UploadToStubHelper(c, st,
261                         func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
262                                 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
263
264                                 writer.Write([]byte("foo"))
265                                 writer.Close()
266
267                                 <-st.handled
268                                 status := <-uploadStatusChan
269                                 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
270                         })
271         }
272 }
273
274 func (s *StandaloneSuite) TestPutWithoutStorageClassesClusterSupport(c *C) {
275         nServers := 5
276         for _, trial := range []struct {
277                 replicas      int
278                 clientClasses []string
279                 putClasses    []string
280                 minRequests   int
281                 maxRequests   int
282                 success       bool
283         }{
284                 // Talking to an older cluster (no default storage classes exported
285                 // config) and no other additional storage classes requirements.
286                 {1, nil, nil, 1, 1, true},
287                 {2, nil, nil, 2, 2, true},
288                 {3, nil, nil, 3, 3, true},
289                 {nServers*2 + 1, nil, nil, nServers, nServers, false},
290
291                 {1, []string{"class1"}, nil, 1, 1, true},
292                 {2, []string{"class1"}, nil, 2, 2, true},
293                 {3, []string{"class1"}, nil, 3, 3, true},
294                 {1, []string{"class1", "class2"}, nil, 1, 1, true},
295                 {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false},
296
297                 {1, nil, []string{"class1"}, 1, 1, true},
298                 {2, nil, []string{"class1"}, 2, 2, true},
299                 {3, nil, []string{"class1"}, 3, 3, true},
300                 {1, nil, []string{"class1", "class2"}, 1, 1, true},
301                 {nServers*2 + 1, nil, []string{"class1"}, nServers, nServers, false},
302         } {
303                 c.Logf("%+v", trial)
304                 st := &StubPutHandler{
305                         c:                    c,
306                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
307                         expectAPIToken:       "abc123",
308                         expectBody:           "foo",
309                         expectStorageClass:   "*",
310                         returnStorageClasses: "", // Simulate old cluster without SC keep support
311                         handled:              make(chan string, 100),
312                 }
313                 ks := RunSomeFakeKeepServers(st, nServers)
314                 arv, _ := arvadosclient.MakeArvadosClient()
315                 kc, _ := MakeKeepClient(arv)
316                 kc.Want_replicas = trial.replicas
317                 kc.StorageClasses = trial.clientClasses
318                 kc.DefaultStorageClasses = nil // Simulate an old cluster without SC defaults
319                 arv.ApiToken = "abc123"
320                 localRoots := make(map[string]string)
321                 writableLocalRoots := make(map[string]string)
322                 for i, k := range ks {
323                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
324                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
325                         defer k.listener.Close()
326                 }
327                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
328
329                 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
330                         Data:           []byte("foo"),
331                         StorageClasses: trial.putClasses,
332                 })
333                 if trial.success {
334                         c.Check(err, IsNil)
335                 } else {
336                         c.Check(err, NotNil)
337                 }
338                 c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
339                 c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
340                 if trial.clientClasses == nil && trial.putClasses == nil {
341                         c.Check(st.requests[0].Header.Get("X-Keep-Storage-Classes"), Equals, "")
342                 }
343         }
344 }
345
346 func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
347         nServers := 5
348         for _, trial := range []struct {
349                 replicas       int
350                 defaultClasses []string
351                 clientClasses  []string // clientClasses takes precedence over defaultClasses
352                 putClasses     []string // putClasses takes precedence over clientClasses
353                 minRequests    int
354                 maxRequests    int
355                 success        bool
356         }{
357                 {1, []string{"class1"}, nil, nil, 1, 1, true},
358                 {2, []string{"class1"}, nil, nil, 1, 2, true},
359                 {3, []string{"class1"}, nil, nil, 2, 3, true},
360                 {1, []string{"class1", "class2"}, nil, nil, 1, 1, true},
361
362                 // defaultClasses doesn't matter when any of the others is specified.
363                 {1, []string{"class1"}, []string{"class1"}, nil, 1, 1, true},
364                 {2, []string{"class1"}, []string{"class1"}, nil, 1, 2, true},
365                 {3, []string{"class1"}, []string{"class1"}, nil, 2, 3, true},
366                 {1, []string{"class1"}, []string{"class1", "class2"}, nil, 1, 1, true},
367                 {3, []string{"class1"}, nil, []string{"class1"}, 2, 3, true},
368                 {1, []string{"class1"}, nil, []string{"class1", "class2"}, 1, 1, true},
369                 {1, []string{"class1"}, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true},
370                 {1, []string{"class1"}, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false},
371                 {nServers*2 + 1, []string{}, []string{"class1"}, nil, nServers, nServers, false},
372                 {1, []string{"class1"}, []string{"class404"}, nil, nServers, nServers, false},
373                 {1, []string{"class1"}, []string{"class1", "class404"}, nil, nServers, nServers, false},
374                 {1, []string{"class1"}, nil, []string{"class1", "class404"}, nServers, nServers, false},
375         } {
376                 c.Logf("%+v", trial)
377                 st := &StubPutHandler{
378                         c:                    c,
379                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
380                         expectAPIToken:       "abc123",
381                         expectBody:           "foo",
382                         expectStorageClass:   "*",
383                         returnStorageClasses: "class1=2, class2=2",
384                         handled:              make(chan string, 100),
385                 }
386                 ks := RunSomeFakeKeepServers(st, nServers)
387                 arv, _ := arvadosclient.MakeArvadosClient()
388                 kc, _ := MakeKeepClient(arv)
389                 kc.Want_replicas = trial.replicas
390                 kc.StorageClasses = trial.clientClasses
391                 kc.DefaultStorageClasses = trial.defaultClasses
392                 arv.ApiToken = "abc123"
393                 localRoots := make(map[string]string)
394                 writableLocalRoots := make(map[string]string)
395                 for i, k := range ks {
396                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
397                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
398                         defer k.listener.Close()
399                 }
400                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
401
402                 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
403                         Data:           []byte("foo"),
404                         StorageClasses: trial.putClasses,
405                 })
406                 if trial.success {
407                         c.Check(err, IsNil)
408                 } else {
409                         c.Check(err, NotNil)
410                 }
411                 c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
412                 c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
413                 if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, Equals, true) {
414                         // Max concurrency should be 1. First request
415                         // should have succeeded for class1. Second
416                         // request should only ask for class404.
417                         c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), Equals, "class404")
418                 }
419         }
420 }
421
422 type FailHandler struct {
423         handled chan string
424 }
425
426 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
427         resp.WriteHeader(500)
428         fh.handled <- fmt.Sprintf("http://%s", req.Host)
429 }
430
431 type FailThenSucceedHandler struct {
432         morefails      int // fail 1 + this many times before succeeding
433         handled        chan string
434         count          atomic.Int64
435         successhandler http.Handler
436         reqIDs         []string
437 }
438
439 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
440         fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
441         if int(fh.count.Add(1)) <= fh.morefails+1 {
442                 resp.WriteHeader(500)
443                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
444         } else {
445                 fh.successhandler.ServeHTTP(resp, req)
446         }
447 }
448
449 type Error404Handler struct {
450         handled chan string
451 }
452
453 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
454         resp.WriteHeader(404)
455         fh.handled <- fmt.Sprintf("http://%s", req.Host)
456 }
457
458 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
459         st := FailHandler{
460                 make(chan string)}
461
462         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
463
464         UploadToStubHelper(c, st,
465                 func(kc *KeepClient, url string, reader io.ReadCloser,
466                         writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
467
468                         go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
469
470                         writer.Write([]byte("foo"))
471                         writer.Close()
472
473                         <-st.handled
474
475                         status := <-uploadStatusChan
476                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
477                         c.Check(status.statusCode, Equals, 500)
478                 })
479 }
480
481 type KeepServer struct {
482         listener net.Listener
483         url      string
484 }
485
486 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
487         ks = make([]KeepServer, n)
488
489         for i := 0; i < n; i++ {
490                 ks[i] = RunFakeKeepServer(st)
491         }
492
493         return ks
494 }
495
496 func (s *StandaloneSuite) TestPutB(c *C) {
497         hash := Md5String("foo")
498
499         st := &StubPutHandler{
500                 c:                    c,
501                 expectPath:           hash,
502                 expectAPIToken:       "abc123",
503                 expectBody:           "foo",
504                 expectStorageClass:   "default",
505                 returnStorageClasses: "",
506                 handled:              make(chan string, 5),
507         }
508
509         arv, _ := arvadosclient.MakeArvadosClient()
510         kc, _ := MakeKeepClient(arv)
511
512         kc.Want_replicas = 2
513         arv.ApiToken = "abc123"
514         localRoots := make(map[string]string)
515         writableLocalRoots := make(map[string]string)
516
517         ks := RunSomeFakeKeepServers(st, 5)
518
519         for i, k := range ks {
520                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
521                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
522                 defer k.listener.Close()
523         }
524
525         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
526
527         kc.PutB([]byte("foo"))
528
529         shuff := NewRootSorter(
530                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
531
532         s1 := <-st.handled
533         s2 := <-st.handled
534         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
535                 (s1 == shuff[1] && s2 == shuff[0]),
536                 Equals,
537                 true)
538 }
539
540 func (s *StandaloneSuite) TestPutHR(c *C) {
541         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
542
543         st := &StubPutHandler{
544                 c:                    c,
545                 expectPath:           hash,
546                 expectAPIToken:       "abc123",
547                 expectBody:           "foo",
548                 expectStorageClass:   "default",
549                 returnStorageClasses: "",
550                 handled:              make(chan string, 5),
551         }
552
553         arv, _ := arvadosclient.MakeArvadosClient()
554         kc, _ := MakeKeepClient(arv)
555
556         kc.Want_replicas = 2
557         arv.ApiToken = "abc123"
558         localRoots := make(map[string]string)
559         writableLocalRoots := make(map[string]string)
560
561         ks := RunSomeFakeKeepServers(st, 5)
562
563         for i, k := range ks {
564                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
565                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
566                 defer k.listener.Close()
567         }
568
569         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
570
571         kc.PutHR(hash, bytes.NewBuffer([]byte("foo")), 3)
572
573         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
574
575         s1 := <-st.handled
576         s2 := <-st.handled
577
578         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
579                 (s1 == shuff[1] && s2 == shuff[0]),
580                 Equals,
581                 true)
582 }
583
584 func (s *StandaloneSuite) TestPutWithFail(c *C) {
585         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
586
587         st := &StubPutHandler{
588                 c:                    c,
589                 expectPath:           hash,
590                 expectAPIToken:       "abc123",
591                 expectBody:           "foo",
592                 expectStorageClass:   "default",
593                 returnStorageClasses: "",
594                 handled:              make(chan string, 4),
595         }
596
597         fh := FailHandler{
598                 make(chan string, 1)}
599
600         arv, err := arvadosclient.MakeArvadosClient()
601         c.Check(err, IsNil)
602         kc, _ := MakeKeepClient(arv)
603
604         kc.Want_replicas = 2
605         arv.ApiToken = "abc123"
606         localRoots := make(map[string]string)
607         writableLocalRoots := make(map[string]string)
608
609         ks1 := RunSomeFakeKeepServers(st, 4)
610         ks2 := RunSomeFakeKeepServers(fh, 1)
611
612         for i, k := range ks1 {
613                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
614                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
615                 defer k.listener.Close()
616         }
617         for i, k := range ks2 {
618                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
619                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
620                 defer k.listener.Close()
621         }
622
623         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
624
625         shuff := NewRootSorter(
626                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
627         c.Logf("%+v", shuff)
628
629         phash, replicas, err := kc.PutB([]byte("foo"))
630
631         <-fh.handled
632
633         c.Check(err, IsNil)
634         c.Check(phash, Equals, "")
635         c.Check(replicas, Equals, 2)
636
637         s1 := <-st.handled
638         s2 := <-st.handled
639
640         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
641                 (s1 == shuff[2] && s2 == shuff[1]),
642                 Equals,
643                 true)
644 }
645
646 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
647         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
648
649         st := &StubPutHandler{
650                 c:                    c,
651                 expectPath:           hash,
652                 expectAPIToken:       "abc123",
653                 expectBody:           "foo",
654                 expectStorageClass:   "default",
655                 returnStorageClasses: "",
656                 handled:              make(chan string, 1),
657         }
658
659         fh := FailHandler{
660                 make(chan string, 4)}
661
662         arv, err := arvadosclient.MakeArvadosClient()
663         c.Check(err, IsNil)
664         kc, _ := MakeKeepClient(arv)
665
666         kc.Want_replicas = 2
667         kc.Retries = 0
668         arv.ApiToken = "abc123"
669         localRoots := make(map[string]string)
670         writableLocalRoots := make(map[string]string)
671
672         ks1 := RunSomeFakeKeepServers(st, 1)
673         ks2 := RunSomeFakeKeepServers(fh, 4)
674
675         for i, k := range ks1 {
676                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
677                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
678                 defer k.listener.Close()
679         }
680         for i, k := range ks2 {
681                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
682                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
683                 defer k.listener.Close()
684         }
685
686         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
687
688         _, replicas, err := kc.PutB([]byte("foo"))
689
690         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
691         c.Check(replicas, Equals, 1)
692         c.Check(<-st.handled, Equals, ks1[0].url)
693 }
694
695 type StubGetHandler struct {
696         c              *C
697         expectPath     string
698         expectAPIToken string
699         httpStatus     int
700         body           []byte
701 }
702
703 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
704         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
705         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken))
706         resp.WriteHeader(sgh.httpStatus)
707         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
708         resp.Write(sgh.body)
709 }
710
711 func (s *StandaloneSuite) TestGet(c *C) {
712         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
713
714         st := StubGetHandler{
715                 c,
716                 hash,
717                 "abc123",
718                 http.StatusOK,
719                 []byte("foo")}
720
721         ks := RunFakeKeepServer(st)
722         defer ks.listener.Close()
723
724         arv, err := arvadosclient.MakeArvadosClient()
725         c.Check(err, IsNil)
726         kc, _ := MakeKeepClient(arv)
727         arv.ApiToken = "abc123"
728         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
729
730         r, n, _, err := kc.Get(hash)
731         c.Assert(err, IsNil)
732         c.Check(n, Equals, int64(3))
733
734         content, err2 := ioutil.ReadAll(r)
735         c.Check(err2, IsNil)
736         c.Check(content, DeepEquals, []byte("foo"))
737         c.Check(r.Close(), IsNil)
738 }
739
740 func (s *StandaloneSuite) TestGet404(c *C) {
741         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
742
743         st := Error404Handler{make(chan string, 1)}
744
745         ks := RunFakeKeepServer(st)
746         defer ks.listener.Close()
747
748         arv, err := arvadosclient.MakeArvadosClient()
749         c.Check(err, IsNil)
750         kc, _ := MakeKeepClient(arv)
751         arv.ApiToken = "abc123"
752         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
753
754         r, n, _, err := kc.Get(hash)
755         c.Check(err, Equals, BlockNotFound)
756         c.Check(n, Equals, int64(0))
757         c.Check(r, IsNil)
758 }
759
760 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
761         st := Error404Handler{make(chan string, 1)}
762
763         ks := RunFakeKeepServer(st)
764         defer ks.listener.Close()
765
766         arv, err := arvadosclient.MakeArvadosClient()
767         c.Check(err, IsNil)
768         kc, _ := MakeKeepClient(arv)
769         arv.ApiToken = "abc123"
770         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
771
772         r, n, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
773         c.Check(err, IsNil)
774         c.Check(n, Equals, int64(0))
775         c.Assert(r, NotNil)
776         buf, err := ioutil.ReadAll(r)
777         c.Check(err, IsNil)
778         c.Check(buf, DeepEquals, []byte{})
779         c.Check(r.Close(), IsNil)
780 }
781
782 func (s *StandaloneSuite) TestGetFail(c *C) {
783         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
784
785         st := FailHandler{make(chan string, 1)}
786
787         ks := RunFakeKeepServer(st)
788         defer ks.listener.Close()
789
790         arv, err := arvadosclient.MakeArvadosClient()
791         c.Check(err, IsNil)
792         kc, _ := MakeKeepClient(arv)
793         arv.ApiToken = "abc123"
794         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
795         kc.Retries = 0
796
797         r, n, _, err := kc.Get(hash)
798         errNotFound, _ := err.(*ErrNotFound)
799         if c.Check(errNotFound, NotNil) {
800                 c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
801                 c.Check(errNotFound.Temporary(), Equals, true)
802         }
803         c.Check(n, Equals, int64(0))
804         c.Check(r, IsNil)
805 }
806
807 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
808         defer func(origDefault, origMinimum time.Duration) {
809                 DefaultRetryDelay = origDefault
810                 MinimumRetryDelay = origMinimum
811         }(DefaultRetryDelay, MinimumRetryDelay)
812         DefaultRetryDelay = time.Second / 8
813         MinimumRetryDelay = time.Millisecond
814
815         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
816
817         for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
818                 c.Logf("=== initial delay %v", delay)
819
820                 st := &FailThenSucceedHandler{
821                         morefails: 2,
822                         handled:   make(chan string, 4),
823                         successhandler: StubGetHandler{
824                                 c,
825                                 hash,
826                                 "abc123",
827                                 http.StatusOK,
828                                 []byte("foo")}}
829
830                 ks := RunFakeKeepServer(st)
831                 defer ks.listener.Close()
832
833                 arv, err := arvadosclient.MakeArvadosClient()
834                 c.Check(err, IsNil)
835                 kc, _ := MakeKeepClient(arv)
836                 arv.ApiToken = "abc123"
837                 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
838                 kc.Retries = 3
839                 kc.RetryDelay = delay
840                 kc.DiskCacheSize = DiskCacheDisabled
841
842                 t0 := time.Now()
843                 r, n, _, err := kc.Get(hash)
844                 c.Assert(err, IsNil)
845                 c.Check(n, Equals, int64(3))
846                 elapsed := time.Since(t0)
847
848                 nonsleeptime := time.Second / 10
849                 expect := kc.RetryDelay
850                 if expect == 0 {
851                         expect = DefaultRetryDelay
852                 }
853                 min := MinimumRetryDelay * 3
854                 max := expect + expect*2 + expect*2*2 + nonsleeptime
855                 c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
856                 c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
857
858                 content, err := ioutil.ReadAll(r)
859                 c.Check(err, IsNil)
860                 c.Check(content, DeepEquals, []byte("foo"))
861                 c.Check(r.Close(), IsNil)
862
863                 c.Logf("%q", st.reqIDs)
864                 if c.Check(st.reqIDs, Not(HasLen), 0) {
865                         for _, reqid := range st.reqIDs {
866                                 c.Check(reqid, Not(Equals), "")
867                                 c.Check(reqid, Equals, st.reqIDs[0])
868                         }
869                 }
870         }
871 }
872
873 func (s *StandaloneSuite) TestGetNetError(c *C) {
874         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
875
876         arv, err := arvadosclient.MakeArvadosClient()
877         c.Check(err, IsNil)
878         kc, _ := MakeKeepClient(arv)
879         arv.ApiToken = "abc123"
880         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
881
882         r, n, _, err := kc.Get(hash)
883         errNotFound, _ := err.(*ErrNotFound)
884         if c.Check(errNotFound, NotNil) {
885                 c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
886                 c.Check(errNotFound.Temporary(), Equals, true)
887         }
888         c.Check(n, Equals, int64(0))
889         c.Check(r, IsNil)
890 }
891
892 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
893         uuid := "zzzzz-bi6l4-123451234512345"
894         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
895
896         // This one shouldn't be used:
897         ks0 := RunFakeKeepServer(StubGetHandler{
898                 c,
899                 "error if used",
900                 "abc123",
901                 http.StatusOK,
902                 []byte("foo")})
903         defer ks0.listener.Close()
904         // This one should be used:
905         ks := RunFakeKeepServer(StubGetHandler{
906                 c,
907                 hash + "+K@" + uuid,
908                 "abc123",
909                 http.StatusOK,
910                 []byte("foo")})
911         defer ks.listener.Close()
912
913         arv, err := arvadosclient.MakeArvadosClient()
914         c.Check(err, IsNil)
915         kc, _ := MakeKeepClient(arv)
916         arv.ApiToken = "abc123"
917         kc.SetServiceRoots(
918                 map[string]string{"x": ks0.url},
919                 nil,
920                 map[string]string{uuid: ks.url})
921
922         r, n, _, err := kc.Get(hash + "+K@" + uuid)
923         c.Assert(err, IsNil)
924         c.Check(n, Equals, int64(3))
925
926         content, err := ioutil.ReadAll(r)
927         c.Check(err, IsNil)
928         c.Check(content, DeepEquals, []byte("foo"))
929         c.Check(r.Close(), IsNil)
930 }
931
932 // Use a service hint to fetch from a local disk service, overriding
933 // rendezvous probe order.
934 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
935         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
936         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
937
938         // This one shouldn't be used, although it appears first in
939         // rendezvous probe order:
940         ks0 := RunFakeKeepServer(StubGetHandler{
941                 c,
942                 "error if used",
943                 "abc123",
944                 http.StatusBadGateway,
945                 nil})
946         defer ks0.listener.Close()
947         // This one should be used:
948         ks := RunFakeKeepServer(StubGetHandler{
949                 c,
950                 hash + "+K@" + uuid,
951                 "abc123",
952                 http.StatusOK,
953                 []byte("foo")})
954         defer ks.listener.Close()
955
956         arv, err := arvadosclient.MakeArvadosClient()
957         c.Check(err, IsNil)
958         kc, _ := MakeKeepClient(arv)
959         arv.ApiToken = "abc123"
960         kc.SetServiceRoots(
961                 map[string]string{
962                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
963                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
964                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
965                         uuid:                          ks.url},
966                 nil,
967                 map[string]string{
968                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
969                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
970                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
971                         uuid:                          ks.url},
972         )
973
974         r, n, _, err := kc.Get(hash + "+K@" + uuid)
975         c.Assert(err, IsNil)
976         c.Check(n, Equals, int64(3))
977
978         content, err := ioutil.ReadAll(r)
979         c.Check(err, IsNil)
980         c.Check(content, DeepEquals, []byte("foo"))
981         c.Check(r.Close(), IsNil)
982 }
983
984 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
985         uuid := "zzzzz-bi6l4-123451234512345"
986         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
987
988         ksLocal := RunFakeKeepServer(StubGetHandler{
989                 c,
990                 hash + "+K@" + uuid,
991                 "abc123",
992                 http.StatusOK,
993                 []byte("foo")})
994         defer ksLocal.listener.Close()
995         ksGateway := RunFakeKeepServer(StubGetHandler{
996                 c,
997                 hash + "+K@" + uuid,
998                 "abc123",
999                 http.StatusInternalServerError,
1000                 []byte("Error")})
1001         defer ksGateway.listener.Close()
1002
1003         arv, err := arvadosclient.MakeArvadosClient()
1004         c.Check(err, IsNil)
1005         kc, _ := MakeKeepClient(arv)
1006         arv.ApiToken = "abc123"
1007         kc.SetServiceRoots(
1008                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
1009                 nil,
1010                 map[string]string{uuid: ksGateway.url})
1011
1012         r, n, _, err := kc.Get(hash + "+K@" + uuid)
1013         c.Assert(err, IsNil)
1014         c.Check(n, Equals, int64(3))
1015
1016         content, err := ioutil.ReadAll(r)
1017         c.Check(err, IsNil)
1018         c.Check(content, DeepEquals, []byte("foo"))
1019         c.Check(r.Close(), IsNil)
1020 }
1021
1022 type BarHandler struct {
1023         handled chan string
1024 }
1025
1026 func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1027         resp.Write([]byte("bar"))
1028         h.handled <- fmt.Sprintf("http://%s", req.Host)
1029 }
1030
1031 func (s *StandaloneSuite) TestChecksum(c *C) {
1032         foohash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1033         barhash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
1034
1035         st := BarHandler{make(chan string, 1)}
1036
1037         ks := RunFakeKeepServer(st)
1038         defer ks.listener.Close()
1039
1040         arv, err := arvadosclient.MakeArvadosClient()
1041         c.Check(err, IsNil)
1042         kc, _ := MakeKeepClient(arv)
1043         arv.ApiToken = "abc123"
1044         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1045
1046         r, n, _, err := kc.Get(barhash)
1047         if c.Check(err, IsNil) {
1048                 _, err = ioutil.ReadAll(r)
1049                 c.Check(n, Equals, int64(3))
1050                 c.Check(err, IsNil)
1051         }
1052
1053         select {
1054         case <-st.handled:
1055         case <-time.After(time.Second):
1056                 c.Fatal("timed out")
1057         }
1058
1059         r, n, _, err = kc.Get(foohash)
1060         if err == nil {
1061                 buf, readerr := ioutil.ReadAll(r)
1062                 c.Logf("%q", buf)
1063                 err = readerr
1064         }
1065         c.Check(err, Equals, BadChecksum)
1066
1067         select {
1068         case <-st.handled:
1069         case <-time.After(time.Second):
1070                 c.Fatal("timed out")
1071         }
1072 }
1073
1074 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
1075         content := []byte("waz")
1076         hash := fmt.Sprintf("%x+3", md5.Sum(content))
1077
1078         fh := Error404Handler{
1079                 make(chan string, 4)}
1080
1081         st := StubGetHandler{
1082                 c,
1083                 hash,
1084                 "abc123",
1085                 http.StatusOK,
1086                 content}
1087
1088         arv, err := arvadosclient.MakeArvadosClient()
1089         c.Check(err, IsNil)
1090         kc, _ := MakeKeepClient(arv)
1091         arv.ApiToken = "abc123"
1092         localRoots := make(map[string]string)
1093         writableLocalRoots := make(map[string]string)
1094
1095         ks1 := RunSomeFakeKeepServers(st, 1)
1096         ks2 := RunSomeFakeKeepServers(fh, 4)
1097
1098         for i, k := range ks1 {
1099                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1100                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1101                 defer k.listener.Close()
1102         }
1103         for i, k := range ks2 {
1104                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
1105                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
1106                 defer k.listener.Close()
1107         }
1108
1109         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1110         kc.Retries = 0
1111
1112         // This test works only if one of the failing services is
1113         // attempted before the succeeding service. Otherwise,
1114         // <-fh.handled below will just hang! (Probe order depends on
1115         // the choice of block content "waz" and the UUIDs of the fake
1116         // servers, so we just tried different strings until we found
1117         // an example that passes this Assert.)
1118         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
1119
1120         r, n, _, err := kc.Get(hash)
1121
1122         select {
1123         case <-fh.handled:
1124         case <-time.After(time.Second):
1125                 c.Fatal("timed out")
1126         }
1127         c.Assert(err, IsNil)
1128         c.Check(n, Equals, int64(3))
1129
1130         readContent, err2 := ioutil.ReadAll(r)
1131         c.Check(err2, IsNil)
1132         c.Check(readContent, DeepEquals, content)
1133         c.Check(r.Close(), IsNil)
1134 }
1135
1136 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
1137         content := []byte("TestPutGetHead")
1138
1139         arv, err := arvadosclient.MakeArvadosClient()
1140         c.Check(err, IsNil)
1141         kc, err := MakeKeepClient(arv)
1142         c.Assert(err, IsNil)
1143
1144         hash := fmt.Sprintf("%x+%d", md5.Sum(content), len(content))
1145
1146         {
1147                 n, _, err := kc.Ask(hash)
1148                 c.Check(err, Equals, BlockNotFound)
1149                 c.Check(n, Equals, int64(0))
1150         }
1151         {
1152                 hash2, replicas, err := kc.PutB(content)
1153                 c.Check(err, IsNil)
1154                 c.Check(hash2, Matches, `\Q`+hash+`\E\b.*`)
1155                 c.Check(replicas, Equals, 2)
1156         }
1157         {
1158                 r, n, _, err := kc.Get(hash)
1159                 c.Check(err, IsNil)
1160                 c.Check(n, Equals, int64(len(content)))
1161                 if c.Check(r, NotNil) {
1162                         readContent, err := ioutil.ReadAll(r)
1163                         c.Check(err, IsNil)
1164                         if c.Check(len(readContent), Equals, len(content)) {
1165                                 c.Check(readContent, DeepEquals, content)
1166                         }
1167                         c.Check(r.Close(), IsNil)
1168                 }
1169         }
1170         {
1171                 n, url2, err := kc.Ask(hash)
1172                 c.Check(err, IsNil)
1173                 c.Check(n, Equals, int64(len(content)))
1174                 c.Check(url2, Matches, "http://localhost:\\d+/\\Q"+hash+"\\E")
1175         }
1176         {
1177                 loc, err := kc.LocalLocator(hash)
1178                 c.Check(err, IsNil)
1179                 c.Assert(len(loc) >= 32, Equals, true)
1180                 c.Check(loc[:32], Equals, hash[:32])
1181         }
1182         {
1183                 content := []byte("the perth county conspiracy")
1184                 loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
1185                 c.Check(loc, Equals, "")
1186                 c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
1187                 c.Check(err, ErrorMatches, `.*HTTP 400.*`)
1188         }
1189 }
1190
1191 type StubProxyHandler struct {
1192         handled chan string
1193 }
1194
1195 func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1196         resp.Header().Set("X-Keep-Replicas-Stored", "2")
1197         h.handled <- fmt.Sprintf("http://%s", req.Host)
1198 }
1199
1200 func (s *StandaloneSuite) TestPutProxy(c *C) {
1201         st := StubProxyHandler{make(chan string, 1)}
1202
1203         arv, err := arvadosclient.MakeArvadosClient()
1204         c.Check(err, IsNil)
1205         kc, _ := MakeKeepClient(arv)
1206
1207         kc.Want_replicas = 2
1208         arv.ApiToken = "abc123"
1209         localRoots := make(map[string]string)
1210         writableLocalRoots := make(map[string]string)
1211
1212         ks1 := RunSomeFakeKeepServers(st, 1)
1213
1214         for i, k := range ks1 {
1215                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1216                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1217                 defer k.listener.Close()
1218         }
1219
1220         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1221
1222         _, replicas, err := kc.PutB([]byte("foo"))
1223         <-st.handled
1224
1225         c.Check(err, IsNil)
1226         c.Check(replicas, Equals, 2)
1227 }
1228
1229 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
1230         st := StubProxyHandler{make(chan string, 1)}
1231
1232         arv, err := arvadosclient.MakeArvadosClient()
1233         c.Check(err, IsNil)
1234         kc, _ := MakeKeepClient(arv)
1235
1236         kc.Want_replicas = 3
1237         arv.ApiToken = "abc123"
1238         localRoots := make(map[string]string)
1239         writableLocalRoots := make(map[string]string)
1240
1241         ks1 := RunSomeFakeKeepServers(st, 1)
1242
1243         for i, k := range ks1 {
1244                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1245                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1246                 defer k.listener.Close()
1247         }
1248         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1249
1250         _, replicas, err := kc.PutB([]byte("foo"))
1251         <-st.handled
1252
1253         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1254         c.Check(replicas, Equals, 2)
1255 }
1256
1257 func (s *StandaloneSuite) TestMakeLocator(c *C) {
1258         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
1259         c.Check(err, IsNil)
1260         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1261         c.Check(l.Size, Equals, 3)
1262         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
1263 }
1264
1265 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
1266         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
1267         c.Check(err, IsNil)
1268         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1269         c.Check(l.Size, Equals, -1)
1270         c.Check(l.Hints, DeepEquals, []string{})
1271 }
1272
1273 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
1274         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
1275         c.Check(err, IsNil)
1276         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1277         c.Check(l.Size, Equals, -1)
1278         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
1279 }
1280
1281 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
1282         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
1283         l, err := MakeLocator(str)
1284         c.Check(err, IsNil)
1285         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1286         c.Check(l.Size, Equals, 3)
1287         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
1288         c.Check(l.String(), Equals, str)
1289 }
1290
1291 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
1292         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
1293         c.Check(err, Equals, InvalidLocatorError)
1294 }
1295
1296 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
1297         hash := Md5String("foo")
1298
1299         st := &StubPutHandler{
1300                 c:                    c,
1301                 expectPath:           hash,
1302                 expectAPIToken:       "abc123",
1303                 expectBody:           "foo",
1304                 expectStorageClass:   "default",
1305                 returnStorageClasses: "",
1306                 handled:              make(chan string, 5),
1307         }
1308
1309         arv, _ := arvadosclient.MakeArvadosClient()
1310         kc, _ := MakeKeepClient(arv)
1311
1312         kc.Want_replicas = 2
1313         arv.ApiToken = "abc123"
1314         localRoots := make(map[string]string)
1315         writableLocalRoots := make(map[string]string)
1316
1317         ks := RunSomeFakeKeepServers(st, 5)
1318
1319         for i, k := range ks {
1320                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1321                 if i == 0 {
1322                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1323                 }
1324                 defer k.listener.Close()
1325         }
1326
1327         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1328
1329         _, replicas, err := kc.PutB([]byte("foo"))
1330
1331         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1332         c.Check(replicas, Equals, 1)
1333
1334         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1335 }
1336
1337 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1338         hash := Md5String("foo")
1339
1340         st := &StubPutHandler{
1341                 c:                    c,
1342                 expectPath:           hash,
1343                 expectAPIToken:       "abc123",
1344                 expectBody:           "foo",
1345                 expectStorageClass:   "",
1346                 returnStorageClasses: "",
1347                 handled:              make(chan string, 5),
1348         }
1349
1350         arv, _ := arvadosclient.MakeArvadosClient()
1351         kc, _ := MakeKeepClient(arv)
1352
1353         kc.Want_replicas = 2
1354         arv.ApiToken = "abc123"
1355         localRoots := make(map[string]string)
1356         writableLocalRoots := make(map[string]string)
1357
1358         ks := RunSomeFakeKeepServers(st, 5)
1359
1360         for i, k := range ks {
1361                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1362                 defer k.listener.Close()
1363         }
1364
1365         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1366
1367         _, replicas, err := kc.PutB([]byte("foo"))
1368
1369         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1370         c.Check(replicas, Equals, 0)
1371 }
1372
1373 type StubGetIndexHandler struct {
1374         c              *C
1375         expectPath     string
1376         expectAPIToken string
1377         httpStatus     int
1378         body           []byte
1379 }
1380
1381 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1382         h.c.Check(req.URL.Path, Equals, h.expectPath)
1383         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1384         resp.WriteHeader(h.httpStatus)
1385         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1386         resp.Write(h.body)
1387 }
1388
1389 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1390         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1391
1392         st := StubGetIndexHandler{
1393                 c,
1394                 "/index",
1395                 "abc123",
1396                 http.StatusOK,
1397                 []byte(hash + " 1443559274\n\n")}
1398
1399         ks := RunFakeKeepServer(st)
1400         defer ks.listener.Close()
1401
1402         arv, err := arvadosclient.MakeArvadosClient()
1403         c.Assert(err, IsNil)
1404         kc, err := MakeKeepClient(arv)
1405         c.Assert(err, IsNil)
1406         arv.ApiToken = "abc123"
1407         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1408
1409         r, err := kc.GetIndex("x", "")
1410         c.Check(err, IsNil)
1411
1412         content, err2 := ioutil.ReadAll(r)
1413         c.Check(err2, IsNil)
1414         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1415 }
1416
1417 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1418         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1419
1420         st := StubGetIndexHandler{
1421                 c,
1422                 "/index/" + hash[0:3],
1423                 "abc123",
1424                 http.StatusOK,
1425                 []byte(hash + " 1443559274\n\n")}
1426
1427         ks := RunFakeKeepServer(st)
1428         defer ks.listener.Close()
1429
1430         arv, err := arvadosclient.MakeArvadosClient()
1431         c.Check(err, IsNil)
1432         kc, _ := MakeKeepClient(arv)
1433         arv.ApiToken = "abc123"
1434         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1435
1436         r, err := kc.GetIndex("x", hash[0:3])
1437         c.Assert(err, IsNil)
1438
1439         content, err2 := ioutil.ReadAll(r)
1440         c.Check(err2, IsNil)
1441         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1442 }
1443
1444 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1445         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1446
1447         st := StubGetIndexHandler{
1448                 c,
1449                 "/index/" + hash[0:3],
1450                 "abc123",
1451                 http.StatusOK,
1452                 []byte(hash)}
1453
1454         ks := RunFakeKeepServer(st)
1455         defer ks.listener.Close()
1456
1457         arv, err := arvadosclient.MakeArvadosClient()
1458         c.Check(err, IsNil)
1459         kc, _ := MakeKeepClient(arv)
1460         arv.ApiToken = "abc123"
1461         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1462
1463         _, err = kc.GetIndex("x", hash[0:3])
1464         c.Check(err, Equals, ErrIncompleteIndex)
1465 }
1466
1467 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1468         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1469
1470         st := StubGetIndexHandler{
1471                 c,
1472                 "/index/" + hash[0:3],
1473                 "abc123",
1474                 http.StatusOK,
1475                 []byte(hash)}
1476
1477         ks := RunFakeKeepServer(st)
1478         defer ks.listener.Close()
1479
1480         arv, err := arvadosclient.MakeArvadosClient()
1481         c.Check(err, IsNil)
1482         kc, _ := MakeKeepClient(arv)
1483         arv.ApiToken = "abc123"
1484         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1485
1486         _, err = kc.GetIndex("y", hash[0:3])
1487         c.Check(err, Equals, ErrNoSuchKeepServer)
1488 }
1489
1490 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1491         st := StubGetIndexHandler{
1492                 c,
1493                 "/index/abcd",
1494                 "abc123",
1495                 http.StatusOK,
1496                 []byte("\n")}
1497
1498         ks := RunFakeKeepServer(st)
1499         defer ks.listener.Close()
1500
1501         arv, err := arvadosclient.MakeArvadosClient()
1502         c.Check(err, IsNil)
1503         kc, _ := MakeKeepClient(arv)
1504         arv.ApiToken = "abc123"
1505         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1506
1507         r, err := kc.GetIndex("x", "abcd")
1508         c.Check(err, IsNil)
1509
1510         content, err2 := ioutil.ReadAll(r)
1511         c.Check(err2, IsNil)
1512         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1513 }
1514
1515 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1516         DefaultRetryDelay = time.Second / 8
1517         MinimumRetryDelay = time.Millisecond
1518
1519         for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
1520                 c.Logf("=== initial delay %v", delay)
1521
1522                 st := &FailThenSucceedHandler{
1523                         morefails: 5, // handler will fail 6x in total, 3 for each server
1524                         handled:   make(chan string, 10),
1525                         successhandler: &StubPutHandler{
1526                                 c:                    c,
1527                                 expectPath:           Md5String("foo"),
1528                                 expectAPIToken:       "abc123",
1529                                 expectBody:           "foo",
1530                                 expectStorageClass:   "default",
1531                                 returnStorageClasses: "",
1532                                 handled:              make(chan string, 5),
1533                         },
1534                 }
1535
1536                 arv, _ := arvadosclient.MakeArvadosClient()
1537                 kc, _ := MakeKeepClient(arv)
1538                 kc.Retries = 3
1539                 kc.RetryDelay = delay
1540                 kc.DiskCacheSize = DiskCacheDisabled
1541                 kc.Want_replicas = 2
1542
1543                 arv.ApiToken = "abc123"
1544                 localRoots := make(map[string]string)
1545                 writableLocalRoots := make(map[string]string)
1546
1547                 ks := RunSomeFakeKeepServers(st, 2)
1548
1549                 for i, k := range ks {
1550                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1551                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1552                         defer k.listener.Close()
1553                 }
1554
1555                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1556
1557                 t0 := time.Now()
1558                 hash, replicas, err := kc.PutB([]byte("foo"))
1559
1560                 c.Check(err, IsNil)
1561                 c.Check(hash, Equals, "")
1562                 c.Check(replicas, Equals, 2)
1563                 elapsed := time.Since(t0)
1564
1565                 nonsleeptime := time.Second / 10
1566                 expect := kc.RetryDelay
1567                 if expect == 0 {
1568                         expect = DefaultRetryDelay
1569                 }
1570                 min := MinimumRetryDelay * 3
1571                 max := expect + expect*2 + expect*2*2
1572                 max += nonsleeptime
1573                 checkInterval(c, elapsed, min, max)
1574         }
1575 }
1576
1577 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1578         arv, err := arvadosclient.MakeArvadosClient()
1579         c.Assert(err, IsNil)
1580
1581         // Add an additional "testblobstore" keepservice
1582         blobKeepService := make(arvadosclient.Dict)
1583         err = arv.Create("keep_services",
1584                 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1585                         "service_host": "localhost",
1586                         "service_port": "21321",
1587                         "service_type": "testblobstore"}},
1588                 &blobKeepService)
1589         c.Assert(err, IsNil)
1590         defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1591         RefreshServiceDiscovery()
1592
1593         // Make a keepclient and ensure that the testblobstore is included
1594         kc, err := MakeKeepClient(arv)
1595         c.Assert(err, IsNil)
1596
1597         // verify kc.LocalRoots
1598         c.Check(len(kc.LocalRoots()), Equals, 3)
1599         for _, root := range kc.LocalRoots() {
1600                 c.Check(root, Matches, "http://localhost:\\d+")
1601         }
1602         c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1603
1604         // verify kc.GatewayRoots
1605         c.Check(len(kc.GatewayRoots()), Equals, 3)
1606         for _, root := range kc.GatewayRoots() {
1607                 c.Check(root, Matches, "http://localhost:\\d+")
1608         }
1609         c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1610
1611         // verify kc.WritableLocalRoots
1612         c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1613         for _, root := range kc.WritableLocalRoots() {
1614                 c.Check(root, Matches, "http://localhost:\\d+")
1615         }
1616         c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1617
1618         c.Assert(kc.replicasPerService, Equals, 0)
1619         c.Assert(kc.foundNonDiskSvc, Equals, true)
1620         c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
1621 }
1622
1623 func (s *StandaloneSuite) TestDelayCalculator_Default(c *C) {
1624         MinimumRetryDelay = time.Second / 2
1625         DefaultRetryDelay = time.Second
1626
1627         dc := delayCalculator{InitialMaxDelay: 0}
1628         checkInterval(c, dc.Next(), time.Second/2, time.Second)
1629         checkInterval(c, dc.Next(), time.Second/2, time.Second*2)
1630         checkInterval(c, dc.Next(), time.Second/2, time.Second*4)
1631         checkInterval(c, dc.Next(), time.Second/2, time.Second*8)
1632         checkInterval(c, dc.Next(), time.Second/2, time.Second*10)
1633         checkInterval(c, dc.Next(), time.Second/2, time.Second*10)
1634 }
1635
1636 func (s *StandaloneSuite) TestDelayCalculator_SetInitial(c *C) {
1637         MinimumRetryDelay = time.Second / 2
1638         DefaultRetryDelay = time.Second
1639
1640         dc := delayCalculator{InitialMaxDelay: time.Second * 2}
1641         checkInterval(c, dc.Next(), time.Second/2, time.Second*2)
1642         checkInterval(c, dc.Next(), time.Second/2, time.Second*4)
1643         checkInterval(c, dc.Next(), time.Second/2, time.Second*8)
1644         checkInterval(c, dc.Next(), time.Second/2, time.Second*16)
1645         checkInterval(c, dc.Next(), time.Second/2, time.Second*20)
1646         checkInterval(c, dc.Next(), time.Second/2, time.Second*20)
1647         checkInterval(c, dc.Next(), time.Second/2, time.Second*20)
1648 }
1649
1650 func (s *StandaloneSuite) TestDelayCalculator_EnsureSomeLongDelays(c *C) {
1651         dc := delayCalculator{InitialMaxDelay: time.Second * 5}
1652         var d time.Duration
1653         n := 4000
1654         for i := 0; i < n; i++ {
1655                 if i < 20 || i%10 == 0 {
1656                         c.Logf("i=%d, delay=%v", i, d)
1657                 }
1658                 if d = dc.Next(); d > dc.InitialMaxDelay*9 {
1659                         return
1660                 }
1661         }
1662         c.Errorf("after %d trials, never got a delay more than 90%% of expected max %d; last was %v", n, dc.InitialMaxDelay*10, d)
1663 }
1664
1665 // If InitialMaxDelay is less than MinimumRetryDelay/10, then delay is
1666 // always MinimumRetryDelay.
1667 func (s *StandaloneSuite) TestDelayCalculator_InitialLessThanMinimum(c *C) {
1668         MinimumRetryDelay = time.Second / 2
1669         dc := delayCalculator{InitialMaxDelay: time.Millisecond}
1670         for i := 0; i < 20; i++ {
1671                 c.Check(dc.Next(), Equals, time.Second/2)
1672         }
1673 }
1674
1675 func checkInterval(c *C, t, min, max time.Duration) {
1676         c.Check(t >= min, Equals, true, Commentf("got %v which is below expected min %v", t, min))
1677         c.Check(t <= max, Equals, true, Commentf("got %v which is above expected max %v", t, max))
1678 }