21023: Use known-good exponential backoff with jitter strategy.
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "net"
16         "net/http"
17         "os"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "testing"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
26         "git.arvados.org/arvados.git/sdk/go/arvadostest"
27         . "gopkg.in/check.v1"
28 )
29
30 func Test(t *testing.T) {
31         DefaultRetryDelay = 50 * time.Millisecond
32         TestingT(t)
33 }
34
35 // Gocheck boilerplate
36 var _ = Suite(&ServerRequiredSuite{})
37 var _ = Suite(&StandaloneSuite{})
38
39 // Tests that require the Keep server running
40 type ServerRequiredSuite struct{}
41
42 // Standalone tests
43 type StandaloneSuite struct{}
44
45 var origHOME = os.Getenv("HOME")
46
47 func (s *StandaloneSuite) SetUpTest(c *C) {
48         RefreshServiceDiscovery()
49         // Prevent cache state from leaking between test cases
50         os.Setenv("HOME", c.MkDir())
51 }
52
53 func (s *StandaloneSuite) TearDownTest(c *C) {
54         os.Setenv("HOME", origHOME)
55 }
56
57 func pythonDir() string {
58         cwd, _ := os.Getwd()
59         return fmt.Sprintf("%s/../../python/tests", cwd)
60 }
61
62 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
63         arvadostest.StartKeep(2, false)
64 }
65
66 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
67         arvadostest.StopKeep(2)
68         os.Setenv("HOME", origHOME)
69 }
70
71 func (s *ServerRequiredSuite) SetUpTest(c *C) {
72         RefreshServiceDiscovery()
73         // Prevent cache state from leaking between test cases
74         os.Setenv("HOME", c.MkDir())
75 }
76
77 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
78         arv, err := arvadosclient.MakeArvadosClient()
79         c.Assert(err, IsNil)
80
81         kc, err := MakeKeepClient(arv)
82
83         c.Assert(err, IsNil)
84         c.Check(len(kc.LocalRoots()), Equals, 2)
85         for _, root := range kc.LocalRoots() {
86                 c.Check(root, Matches, "http://localhost:\\d+")
87         }
88 }
89
90 func (s *ServerRequiredSuite) TestDefaultStorageClasses(c *C) {
91         arv, err := arvadosclient.MakeArvadosClient()
92         c.Assert(err, IsNil)
93
94         cc, err := arv.ClusterConfig("StorageClasses")
95         c.Assert(err, IsNil)
96         c.Assert(cc, NotNil)
97         c.Assert(cc.(map[string]interface{})["default"], NotNil)
98
99         kc := New(arv)
100         c.Assert(kc.DefaultStorageClasses, DeepEquals, []string{"default"})
101 }
102
103 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
104         arv, err := arvadosclient.MakeArvadosClient()
105         c.Assert(err, IsNil)
106
107         kc, err := MakeKeepClient(arv)
108         c.Check(err, IsNil)
109         c.Assert(kc.Want_replicas, Equals, 2)
110
111         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
112         kc, err = MakeKeepClient(arv)
113         c.Check(err, IsNil)
114         c.Assert(kc.Want_replicas, Equals, 3)
115
116         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
117         kc, err = MakeKeepClient(arv)
118         c.Check(err, IsNil)
119         c.Assert(kc.Want_replicas, Equals, 1)
120 }
121
122 type StubPutHandler struct {
123         c                    *C
124         expectPath           string
125         expectAPIToken       string
126         expectBody           string
127         expectStorageClass   string
128         returnStorageClasses string
129         handled              chan string
130         requests             []*http.Request
131         mtx                  sync.Mutex
132 }
133
134 func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
135         sph.mtx.Lock()
136         sph.requests = append(sph.requests, req)
137         sph.mtx.Unlock()
138         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
139         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken))
140         if sph.expectStorageClass != "*" {
141                 sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
142         }
143         body, err := ioutil.ReadAll(req.Body)
144         sph.c.Check(err, IsNil)
145         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
146         resp.Header().Set("X-Keep-Replicas-Stored", "1")
147         if sph.returnStorageClasses != "" {
148                 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
149         }
150         resp.WriteHeader(200)
151         sph.handled <- fmt.Sprintf("http://%s", req.Host)
152 }
153
154 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
155         var err error
156         // If we don't explicitly bind it to localhost, ks.listener.Addr() will
157         // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
158         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
159         if err != nil {
160                 panic("Could not listen on any port")
161         }
162         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
163         go http.Serve(ks.listener, st)
164         return
165 }
166
167 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
168         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
169
170         ks := RunFakeKeepServer(st)
171         defer ks.listener.Close()
172
173         arv, _ := arvadosclient.MakeArvadosClient()
174         arv.ApiToken = "abc123"
175
176         kc, _ := MakeKeepClient(arv)
177
178         reader, writer := io.Pipe()
179         uploadStatusChan := make(chan uploadStatus)
180
181         f(kc, ks.url, reader, writer, uploadStatusChan)
182 }
183
184 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
185         log.Printf("TestUploadToStubKeepServer")
186
187         st := &StubPutHandler{
188                 c:                    c,
189                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
190                 expectAPIToken:       "abc123",
191                 expectBody:           "foo",
192                 expectStorageClass:   "",
193                 returnStorageClasses: "default=1",
194                 handled:              make(chan string),
195         }
196
197         UploadToStubHelper(c, st,
198                 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
199                         go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
200
201                         writer.Write([]byte("foo"))
202                         writer.Close()
203
204                         <-st.handled
205                         status := <-uploadStatusChan
206                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
207                 })
208 }
209
210 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
211         st := &StubPutHandler{
212                 c:                    c,
213                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
214                 expectAPIToken:       "abc123",
215                 expectBody:           "foo",
216                 expectStorageClass:   "",
217                 returnStorageClasses: "default=1",
218                 handled:              make(chan string),
219         }
220
221         UploadToStubHelper(c, st,
222                 func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
223                         go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
224
225                         <-st.handled
226
227                         status := <-uploadStatusChan
228                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
229                 })
230 }
231
232 func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
233         for _, trial := range []struct {
234                 respHeader string
235                 expectMap  map[string]int
236         }{
237                 {"", nil},
238                 {"foo=1", map[string]int{"foo": 1}},
239                 {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
240                 {" =foo=1 ", nil},
241                 {"foo", nil},
242         } {
243                 st := &StubPutHandler{
244                         c:                    c,
245                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
246                         expectAPIToken:       "abc123",
247                         expectBody:           "foo",
248                         expectStorageClass:   "",
249                         returnStorageClasses: trial.respHeader,
250                         handled:              make(chan string),
251                 }
252
253                 UploadToStubHelper(c, st,
254                         func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
255                                 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
256
257                                 writer.Write([]byte("foo"))
258                                 writer.Close()
259
260                                 <-st.handled
261                                 status := <-uploadStatusChan
262                                 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
263                         })
264         }
265 }
266
267 func (s *StandaloneSuite) TestPutWithoutStorageClassesClusterSupport(c *C) {
268         nServers := 5
269         for _, trial := range []struct {
270                 replicas      int
271                 clientClasses []string
272                 putClasses    []string
273                 minRequests   int
274                 maxRequests   int
275                 success       bool
276         }{
277                 // Talking to an older cluster (no default storage classes exported
278                 // config) and no other additional storage classes requirements.
279                 {1, nil, nil, 1, 1, true},
280                 {2, nil, nil, 2, 2, true},
281                 {3, nil, nil, 3, 3, true},
282                 {nServers*2 + 1, nil, nil, nServers, nServers, false},
283
284                 {1, []string{"class1"}, nil, 1, 1, true},
285                 {2, []string{"class1"}, nil, 2, 2, true},
286                 {3, []string{"class1"}, nil, 3, 3, true},
287                 {1, []string{"class1", "class2"}, nil, 1, 1, true},
288                 {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false},
289
290                 {1, nil, []string{"class1"}, 1, 1, true},
291                 {2, nil, []string{"class1"}, 2, 2, true},
292                 {3, nil, []string{"class1"}, 3, 3, true},
293                 {1, nil, []string{"class1", "class2"}, 1, 1, true},
294                 {nServers*2 + 1, nil, []string{"class1"}, nServers, nServers, false},
295         } {
296                 c.Logf("%+v", trial)
297                 st := &StubPutHandler{
298                         c:                    c,
299                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
300                         expectAPIToken:       "abc123",
301                         expectBody:           "foo",
302                         expectStorageClass:   "*",
303                         returnStorageClasses: "", // Simulate old cluster without SC keep support
304                         handled:              make(chan string, 100),
305                 }
306                 ks := RunSomeFakeKeepServers(st, nServers)
307                 arv, _ := arvadosclient.MakeArvadosClient()
308                 kc, _ := MakeKeepClient(arv)
309                 kc.Want_replicas = trial.replicas
310                 kc.StorageClasses = trial.clientClasses
311                 kc.DefaultStorageClasses = nil // Simulate an old cluster without SC defaults
312                 arv.ApiToken = "abc123"
313                 localRoots := make(map[string]string)
314                 writableLocalRoots := make(map[string]string)
315                 for i, k := range ks {
316                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
317                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
318                         defer k.listener.Close()
319                 }
320                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
321
322                 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
323                         Data:           []byte("foo"),
324                         StorageClasses: trial.putClasses,
325                 })
326                 if trial.success {
327                         c.Check(err, IsNil)
328                 } else {
329                         c.Check(err, NotNil)
330                 }
331                 c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
332                 c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
333                 if trial.clientClasses == nil && trial.putClasses == nil {
334                         c.Check(st.requests[0].Header.Get("X-Keep-Storage-Classes"), Equals, "")
335                 }
336         }
337 }
338
339 func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
340         nServers := 5
341         for _, trial := range []struct {
342                 replicas       int
343                 defaultClasses []string
344                 clientClasses  []string // clientClasses takes precedence over defaultClasses
345                 putClasses     []string // putClasses takes precedence over clientClasses
346                 minRequests    int
347                 maxRequests    int
348                 success        bool
349         }{
350                 {1, []string{"class1"}, nil, nil, 1, 1, true},
351                 {2, []string{"class1"}, nil, nil, 1, 2, true},
352                 {3, []string{"class1"}, nil, nil, 2, 3, true},
353                 {1, []string{"class1", "class2"}, nil, nil, 1, 1, true},
354
355                 // defaultClasses doesn't matter when any of the others is specified.
356                 {1, []string{"class1"}, []string{"class1"}, nil, 1, 1, true},
357                 {2, []string{"class1"}, []string{"class1"}, nil, 1, 2, true},
358                 {3, []string{"class1"}, []string{"class1"}, nil, 2, 3, true},
359                 {1, []string{"class1"}, []string{"class1", "class2"}, nil, 1, 1, true},
360                 {3, []string{"class1"}, nil, []string{"class1"}, 2, 3, true},
361                 {1, []string{"class1"}, nil, []string{"class1", "class2"}, 1, 1, true},
362                 {1, []string{"class1"}, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true},
363                 {1, []string{"class1"}, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false},
364                 {nServers*2 + 1, []string{}, []string{"class1"}, nil, nServers, nServers, false},
365                 {1, []string{"class1"}, []string{"class404"}, nil, nServers, nServers, false},
366                 {1, []string{"class1"}, []string{"class1", "class404"}, nil, nServers, nServers, false},
367                 {1, []string{"class1"}, nil, []string{"class1", "class404"}, nServers, nServers, false},
368         } {
369                 c.Logf("%+v", trial)
370                 st := &StubPutHandler{
371                         c:                    c,
372                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
373                         expectAPIToken:       "abc123",
374                         expectBody:           "foo",
375                         expectStorageClass:   "*",
376                         returnStorageClasses: "class1=2, class2=2",
377                         handled:              make(chan string, 100),
378                 }
379                 ks := RunSomeFakeKeepServers(st, nServers)
380                 arv, _ := arvadosclient.MakeArvadosClient()
381                 kc, _ := MakeKeepClient(arv)
382                 kc.Want_replicas = trial.replicas
383                 kc.StorageClasses = trial.clientClasses
384                 kc.DefaultStorageClasses = trial.defaultClasses
385                 arv.ApiToken = "abc123"
386                 localRoots := make(map[string]string)
387                 writableLocalRoots := make(map[string]string)
388                 for i, k := range ks {
389                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
390                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
391                         defer k.listener.Close()
392                 }
393                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
394
395                 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
396                         Data:           []byte("foo"),
397                         StorageClasses: trial.putClasses,
398                 })
399                 if trial.success {
400                         c.Check(err, IsNil)
401                 } else {
402                         c.Check(err, NotNil)
403                 }
404                 c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
405                 c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
406                 if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, Equals, true) {
407                         // Max concurrency should be 1. First request
408                         // should have succeeded for class1. Second
409                         // request should only ask for class404.
410                         c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), Equals, "class404")
411                 }
412         }
413 }
414
415 type FailHandler struct {
416         handled chan string
417 }
418
419 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
420         resp.WriteHeader(500)
421         fh.handled <- fmt.Sprintf("http://%s", req.Host)
422 }
423
424 type FailThenSucceedHandler struct {
425         morefails      int // fail 1 + this many times before succeeding
426         handled        chan string
427         count          atomic.Int64
428         successhandler http.Handler
429         reqIDs         []string
430 }
431
432 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
433         fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
434         if int(fh.count.Add(1)) <= fh.morefails+1 {
435                 resp.WriteHeader(500)
436                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
437         } else {
438                 fh.successhandler.ServeHTTP(resp, req)
439         }
440 }
441
442 type Error404Handler struct {
443         handled chan string
444 }
445
446 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
447         resp.WriteHeader(404)
448         fh.handled <- fmt.Sprintf("http://%s", req.Host)
449 }
450
451 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
452         st := FailHandler{
453                 make(chan string)}
454
455         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
456
457         UploadToStubHelper(c, st,
458                 func(kc *KeepClient, url string, reader io.ReadCloser,
459                         writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
460
461                         go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
462
463                         writer.Write([]byte("foo"))
464                         writer.Close()
465
466                         <-st.handled
467
468                         status := <-uploadStatusChan
469                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
470                         c.Check(status.statusCode, Equals, 500)
471                 })
472 }
473
474 type KeepServer struct {
475         listener net.Listener
476         url      string
477 }
478
479 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
480         ks = make([]KeepServer, n)
481
482         for i := 0; i < n; i++ {
483                 ks[i] = RunFakeKeepServer(st)
484         }
485
486         return ks
487 }
488
489 func (s *StandaloneSuite) TestPutB(c *C) {
490         hash := Md5String("foo")
491
492         st := &StubPutHandler{
493                 c:                    c,
494                 expectPath:           hash,
495                 expectAPIToken:       "abc123",
496                 expectBody:           "foo",
497                 expectStorageClass:   "default",
498                 returnStorageClasses: "",
499                 handled:              make(chan string, 5),
500         }
501
502         arv, _ := arvadosclient.MakeArvadosClient()
503         kc, _ := MakeKeepClient(arv)
504
505         kc.Want_replicas = 2
506         arv.ApiToken = "abc123"
507         localRoots := make(map[string]string)
508         writableLocalRoots := make(map[string]string)
509
510         ks := RunSomeFakeKeepServers(st, 5)
511
512         for i, k := range ks {
513                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
514                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
515                 defer k.listener.Close()
516         }
517
518         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
519
520         kc.PutB([]byte("foo"))
521
522         shuff := NewRootSorter(
523                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
524
525         s1 := <-st.handled
526         s2 := <-st.handled
527         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
528                 (s1 == shuff[1] && s2 == shuff[0]),
529                 Equals,
530                 true)
531 }
532
533 func (s *StandaloneSuite) TestPutHR(c *C) {
534         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
535
536         st := &StubPutHandler{
537                 c:                    c,
538                 expectPath:           hash,
539                 expectAPIToken:       "abc123",
540                 expectBody:           "foo",
541                 expectStorageClass:   "default",
542                 returnStorageClasses: "",
543                 handled:              make(chan string, 5),
544         }
545
546         arv, _ := arvadosclient.MakeArvadosClient()
547         kc, _ := MakeKeepClient(arv)
548
549         kc.Want_replicas = 2
550         arv.ApiToken = "abc123"
551         localRoots := make(map[string]string)
552         writableLocalRoots := make(map[string]string)
553
554         ks := RunSomeFakeKeepServers(st, 5)
555
556         for i, k := range ks {
557                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
558                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
559                 defer k.listener.Close()
560         }
561
562         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
563
564         kc.PutHR(hash, bytes.NewBuffer([]byte("foo")), 3)
565
566         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
567
568         s1 := <-st.handled
569         s2 := <-st.handled
570
571         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
572                 (s1 == shuff[1] && s2 == shuff[0]),
573                 Equals,
574                 true)
575 }
576
577 func (s *StandaloneSuite) TestPutWithFail(c *C) {
578         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
579
580         st := &StubPutHandler{
581                 c:                    c,
582                 expectPath:           hash,
583                 expectAPIToken:       "abc123",
584                 expectBody:           "foo",
585                 expectStorageClass:   "default",
586                 returnStorageClasses: "",
587                 handled:              make(chan string, 4),
588         }
589
590         fh := FailHandler{
591                 make(chan string, 1)}
592
593         arv, err := arvadosclient.MakeArvadosClient()
594         c.Check(err, IsNil)
595         kc, _ := MakeKeepClient(arv)
596
597         kc.Want_replicas = 2
598         arv.ApiToken = "abc123"
599         localRoots := make(map[string]string)
600         writableLocalRoots := make(map[string]string)
601
602         ks1 := RunSomeFakeKeepServers(st, 4)
603         ks2 := RunSomeFakeKeepServers(fh, 1)
604
605         for i, k := range ks1 {
606                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
607                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
608                 defer k.listener.Close()
609         }
610         for i, k := range ks2 {
611                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
612                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
613                 defer k.listener.Close()
614         }
615
616         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
617
618         shuff := NewRootSorter(
619                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
620         c.Logf("%+v", shuff)
621
622         phash, replicas, err := kc.PutB([]byte("foo"))
623
624         <-fh.handled
625
626         c.Check(err, IsNil)
627         c.Check(phash, Equals, "")
628         c.Check(replicas, Equals, 2)
629
630         s1 := <-st.handled
631         s2 := <-st.handled
632
633         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
634                 (s1 == shuff[2] && s2 == shuff[1]),
635                 Equals,
636                 true)
637 }
638
639 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
640         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
641
642         st := &StubPutHandler{
643                 c:                    c,
644                 expectPath:           hash,
645                 expectAPIToken:       "abc123",
646                 expectBody:           "foo",
647                 expectStorageClass:   "default",
648                 returnStorageClasses: "",
649                 handled:              make(chan string, 1),
650         }
651
652         fh := FailHandler{
653                 make(chan string, 4)}
654
655         arv, err := arvadosclient.MakeArvadosClient()
656         c.Check(err, IsNil)
657         kc, _ := MakeKeepClient(arv)
658
659         kc.Want_replicas = 2
660         kc.Retries = 0
661         arv.ApiToken = "abc123"
662         localRoots := make(map[string]string)
663         writableLocalRoots := make(map[string]string)
664
665         ks1 := RunSomeFakeKeepServers(st, 1)
666         ks2 := RunSomeFakeKeepServers(fh, 4)
667
668         for i, k := range ks1 {
669                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
670                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
671                 defer k.listener.Close()
672         }
673         for i, k := range ks2 {
674                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
675                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
676                 defer k.listener.Close()
677         }
678
679         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
680
681         _, replicas, err := kc.PutB([]byte("foo"))
682
683         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
684         c.Check(replicas, Equals, 1)
685         c.Check(<-st.handled, Equals, ks1[0].url)
686 }
687
688 type StubGetHandler struct {
689         c              *C
690         expectPath     string
691         expectAPIToken string
692         httpStatus     int
693         body           []byte
694 }
695
696 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
697         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
698         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken))
699         resp.WriteHeader(sgh.httpStatus)
700         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
701         resp.Write(sgh.body)
702 }
703
704 func (s *StandaloneSuite) TestGet(c *C) {
705         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
706
707         st := StubGetHandler{
708                 c,
709                 hash,
710                 "abc123",
711                 http.StatusOK,
712                 []byte("foo")}
713
714         ks := RunFakeKeepServer(st)
715         defer ks.listener.Close()
716
717         arv, err := arvadosclient.MakeArvadosClient()
718         c.Check(err, IsNil)
719         kc, _ := MakeKeepClient(arv)
720         arv.ApiToken = "abc123"
721         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
722
723         r, n, _, err := kc.Get(hash)
724         c.Assert(err, IsNil)
725         c.Check(n, Equals, int64(3))
726
727         content, err2 := ioutil.ReadAll(r)
728         c.Check(err2, IsNil)
729         c.Check(content, DeepEquals, []byte("foo"))
730         c.Check(r.Close(), IsNil)
731 }
732
733 func (s *StandaloneSuite) TestGet404(c *C) {
734         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
735
736         st := Error404Handler{make(chan string, 1)}
737
738         ks := RunFakeKeepServer(st)
739         defer ks.listener.Close()
740
741         arv, err := arvadosclient.MakeArvadosClient()
742         c.Check(err, IsNil)
743         kc, _ := MakeKeepClient(arv)
744         arv.ApiToken = "abc123"
745         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
746
747         r, n, _, err := kc.Get(hash)
748         c.Check(err, Equals, BlockNotFound)
749         c.Check(n, Equals, int64(0))
750         c.Check(r, IsNil)
751 }
752
753 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
754         st := Error404Handler{make(chan string, 1)}
755
756         ks := RunFakeKeepServer(st)
757         defer ks.listener.Close()
758
759         arv, err := arvadosclient.MakeArvadosClient()
760         c.Check(err, IsNil)
761         kc, _ := MakeKeepClient(arv)
762         arv.ApiToken = "abc123"
763         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
764
765         r, n, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
766         c.Check(err, IsNil)
767         c.Check(n, Equals, int64(0))
768         c.Assert(r, NotNil)
769         buf, err := ioutil.ReadAll(r)
770         c.Check(err, IsNil)
771         c.Check(buf, DeepEquals, []byte{})
772         c.Check(r.Close(), IsNil)
773 }
774
775 func (s *StandaloneSuite) TestGetFail(c *C) {
776         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
777
778         st := FailHandler{make(chan string, 1)}
779
780         ks := RunFakeKeepServer(st)
781         defer ks.listener.Close()
782
783         arv, err := arvadosclient.MakeArvadosClient()
784         c.Check(err, IsNil)
785         kc, _ := MakeKeepClient(arv)
786         arv.ApiToken = "abc123"
787         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
788         kc.Retries = 0
789
790         r, n, _, err := kc.Get(hash)
791         errNotFound, _ := err.(*ErrNotFound)
792         if c.Check(errNotFound, NotNil) {
793                 c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
794                 c.Check(errNotFound.Temporary(), Equals, true)
795         }
796         c.Check(n, Equals, int64(0))
797         c.Check(r, IsNil)
798 }
799
800 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
801         defer func(origDefault, origMinimum time.Duration) {
802                 DefaultRetryDelay = origDefault
803                 MinimumRetryDelay = origMinimum
804         }(DefaultRetryDelay, MinimumRetryDelay)
805         DefaultRetryDelay = time.Second / 8
806         MinimumRetryDelay = time.Millisecond
807
808         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
809
810         for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
811                 c.Logf("=== initial delay %v", delay)
812
813                 st := &FailThenSucceedHandler{
814                         morefails: 2,
815                         handled:   make(chan string, 4),
816                         successhandler: StubGetHandler{
817                                 c,
818                                 hash,
819                                 "abc123",
820                                 http.StatusOK,
821                                 []byte("foo")}}
822
823                 ks := RunFakeKeepServer(st)
824                 defer ks.listener.Close()
825
826                 arv, err := arvadosclient.MakeArvadosClient()
827                 c.Check(err, IsNil)
828                 kc, _ := MakeKeepClient(arv)
829                 arv.ApiToken = "abc123"
830                 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
831                 kc.Retries = 3
832                 kc.RetryDelay = delay
833                 kc.DiskCacheSize = DiskCacheDisabled
834
835                 t0 := time.Now()
836                 r, n, _, err := kc.Get(hash)
837                 c.Assert(err, IsNil)
838                 c.Check(n, Equals, int64(3))
839                 elapsed := time.Since(t0)
840
841                 nonsleeptime := time.Second / 10
842                 expect := kc.RetryDelay
843                 if expect == 0 {
844                         expect = DefaultRetryDelay
845                 }
846                 min := MinimumRetryDelay * 3
847                 max := expect + expect*2 + expect*2*2 + nonsleeptime
848                 c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
849                 c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
850
851                 content, err := ioutil.ReadAll(r)
852                 c.Check(err, IsNil)
853                 c.Check(content, DeepEquals, []byte("foo"))
854                 c.Check(r.Close(), IsNil)
855
856                 c.Logf("%q", st.reqIDs)
857                 if c.Check(st.reqIDs, Not(HasLen), 0) {
858                         for _, reqid := range st.reqIDs {
859                                 c.Check(reqid, Not(Equals), "")
860                                 c.Check(reqid, Equals, st.reqIDs[0])
861                         }
862                 }
863         }
864 }
865
866 func (s *StandaloneSuite) TestGetNetError(c *C) {
867         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
868
869         arv, err := arvadosclient.MakeArvadosClient()
870         c.Check(err, IsNil)
871         kc, _ := MakeKeepClient(arv)
872         arv.ApiToken = "abc123"
873         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
874
875         r, n, _, err := kc.Get(hash)
876         errNotFound, _ := err.(*ErrNotFound)
877         if c.Check(errNotFound, NotNil) {
878                 c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
879                 c.Check(errNotFound.Temporary(), Equals, true)
880         }
881         c.Check(n, Equals, int64(0))
882         c.Check(r, IsNil)
883 }
884
885 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
886         uuid := "zzzzz-bi6l4-123451234512345"
887         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
888
889         // This one shouldn't be used:
890         ks0 := RunFakeKeepServer(StubGetHandler{
891                 c,
892                 "error if used",
893                 "abc123",
894                 http.StatusOK,
895                 []byte("foo")})
896         defer ks0.listener.Close()
897         // This one should be used:
898         ks := RunFakeKeepServer(StubGetHandler{
899                 c,
900                 hash + "+K@" + uuid,
901                 "abc123",
902                 http.StatusOK,
903                 []byte("foo")})
904         defer ks.listener.Close()
905
906         arv, err := arvadosclient.MakeArvadosClient()
907         c.Check(err, IsNil)
908         kc, _ := MakeKeepClient(arv)
909         arv.ApiToken = "abc123"
910         kc.SetServiceRoots(
911                 map[string]string{"x": ks0.url},
912                 nil,
913                 map[string]string{uuid: ks.url})
914
915         r, n, _, err := kc.Get(hash + "+K@" + uuid)
916         c.Assert(err, IsNil)
917         c.Check(n, Equals, int64(3))
918
919         content, err := ioutil.ReadAll(r)
920         c.Check(err, IsNil)
921         c.Check(content, DeepEquals, []byte("foo"))
922         c.Check(r.Close(), IsNil)
923 }
924
925 // Use a service hint to fetch from a local disk service, overriding
926 // rendezvous probe order.
927 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
928         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
929         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
930
931         // This one shouldn't be used, although it appears first in
932         // rendezvous probe order:
933         ks0 := RunFakeKeepServer(StubGetHandler{
934                 c,
935                 "error if used",
936                 "abc123",
937                 http.StatusBadGateway,
938                 nil})
939         defer ks0.listener.Close()
940         // This one should be used:
941         ks := RunFakeKeepServer(StubGetHandler{
942                 c,
943                 hash + "+K@" + uuid,
944                 "abc123",
945                 http.StatusOK,
946                 []byte("foo")})
947         defer ks.listener.Close()
948
949         arv, err := arvadosclient.MakeArvadosClient()
950         c.Check(err, IsNil)
951         kc, _ := MakeKeepClient(arv)
952         arv.ApiToken = "abc123"
953         kc.SetServiceRoots(
954                 map[string]string{
955                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
956                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
957                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
958                         uuid:                          ks.url},
959                 nil,
960                 map[string]string{
961                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
962                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
963                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
964                         uuid:                          ks.url},
965         )
966
967         r, n, _, err := kc.Get(hash + "+K@" + uuid)
968         c.Assert(err, IsNil)
969         c.Check(n, Equals, int64(3))
970
971         content, err := ioutil.ReadAll(r)
972         c.Check(err, IsNil)
973         c.Check(content, DeepEquals, []byte("foo"))
974         c.Check(r.Close(), IsNil)
975 }
976
977 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
978         uuid := "zzzzz-bi6l4-123451234512345"
979         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
980
981         ksLocal := RunFakeKeepServer(StubGetHandler{
982                 c,
983                 hash + "+K@" + uuid,
984                 "abc123",
985                 http.StatusOK,
986                 []byte("foo")})
987         defer ksLocal.listener.Close()
988         ksGateway := RunFakeKeepServer(StubGetHandler{
989                 c,
990                 hash + "+K@" + uuid,
991                 "abc123",
992                 http.StatusInternalServerError,
993                 []byte("Error")})
994         defer ksGateway.listener.Close()
995
996         arv, err := arvadosclient.MakeArvadosClient()
997         c.Check(err, IsNil)
998         kc, _ := MakeKeepClient(arv)
999         arv.ApiToken = "abc123"
1000         kc.SetServiceRoots(
1001                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
1002                 nil,
1003                 map[string]string{uuid: ksGateway.url})
1004
1005         r, n, _, err := kc.Get(hash + "+K@" + uuid)
1006         c.Assert(err, IsNil)
1007         c.Check(n, Equals, int64(3))
1008
1009         content, err := ioutil.ReadAll(r)
1010         c.Check(err, IsNil)
1011         c.Check(content, DeepEquals, []byte("foo"))
1012         c.Check(r.Close(), IsNil)
1013 }
1014
1015 type BarHandler struct {
1016         handled chan string
1017 }
1018
1019 func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1020         resp.Write([]byte("bar"))
1021         h.handled <- fmt.Sprintf("http://%s", req.Host)
1022 }
1023
1024 func (s *StandaloneSuite) TestChecksum(c *C) {
1025         foohash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1026         barhash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
1027
1028         st := BarHandler{make(chan string, 1)}
1029
1030         ks := RunFakeKeepServer(st)
1031         defer ks.listener.Close()
1032
1033         arv, err := arvadosclient.MakeArvadosClient()
1034         c.Check(err, IsNil)
1035         kc, _ := MakeKeepClient(arv)
1036         arv.ApiToken = "abc123"
1037         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1038
1039         r, n, _, err := kc.Get(barhash)
1040         if c.Check(err, IsNil) {
1041                 _, err = ioutil.ReadAll(r)
1042                 c.Check(n, Equals, int64(3))
1043                 c.Check(err, IsNil)
1044         }
1045
1046         select {
1047         case <-st.handled:
1048         case <-time.After(time.Second):
1049                 c.Fatal("timed out")
1050         }
1051
1052         r, n, _, err = kc.Get(foohash)
1053         if err == nil {
1054                 buf, readerr := ioutil.ReadAll(r)
1055                 c.Logf("%q", buf)
1056                 err = readerr
1057         }
1058         c.Check(err, Equals, BadChecksum)
1059
1060         select {
1061         case <-st.handled:
1062         case <-time.After(time.Second):
1063                 c.Fatal("timed out")
1064         }
1065 }
1066
1067 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
1068         content := []byte("waz")
1069         hash := fmt.Sprintf("%x+3", md5.Sum(content))
1070
1071         fh := Error404Handler{
1072                 make(chan string, 4)}
1073
1074         st := StubGetHandler{
1075                 c,
1076                 hash,
1077                 "abc123",
1078                 http.StatusOK,
1079                 content}
1080
1081         arv, err := arvadosclient.MakeArvadosClient()
1082         c.Check(err, IsNil)
1083         kc, _ := MakeKeepClient(arv)
1084         arv.ApiToken = "abc123"
1085         localRoots := make(map[string]string)
1086         writableLocalRoots := make(map[string]string)
1087
1088         ks1 := RunSomeFakeKeepServers(st, 1)
1089         ks2 := RunSomeFakeKeepServers(fh, 4)
1090
1091         for i, k := range ks1 {
1092                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1093                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1094                 defer k.listener.Close()
1095         }
1096         for i, k := range ks2 {
1097                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
1098                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
1099                 defer k.listener.Close()
1100         }
1101
1102         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1103         kc.Retries = 0
1104
1105         // This test works only if one of the failing services is
1106         // attempted before the succeeding service. Otherwise,
1107         // <-fh.handled below will just hang! (Probe order depends on
1108         // the choice of block content "waz" and the UUIDs of the fake
1109         // servers, so we just tried different strings until we found
1110         // an example that passes this Assert.)
1111         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
1112
1113         r, n, _, err := kc.Get(hash)
1114
1115         select {
1116         case <-fh.handled:
1117         case <-time.After(time.Second):
1118                 c.Fatal("timed out")
1119         }
1120         c.Assert(err, IsNil)
1121         c.Check(n, Equals, int64(3))
1122
1123         readContent, err2 := ioutil.ReadAll(r)
1124         c.Check(err2, IsNil)
1125         c.Check(readContent, DeepEquals, content)
1126         c.Check(r.Close(), IsNil)
1127 }
1128
1129 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
1130         content := []byte("TestPutGetHead")
1131
1132         arv, err := arvadosclient.MakeArvadosClient()
1133         c.Check(err, IsNil)
1134         kc, err := MakeKeepClient(arv)
1135         c.Assert(err, IsNil)
1136
1137         hash := fmt.Sprintf("%x+%d", md5.Sum(content), len(content))
1138
1139         {
1140                 n, _, err := kc.Ask(hash)
1141                 c.Check(err, Equals, BlockNotFound)
1142                 c.Check(n, Equals, int64(0))
1143         }
1144         {
1145                 hash2, replicas, err := kc.PutB(content)
1146                 c.Check(err, IsNil)
1147                 c.Check(hash2, Matches, `\Q`+hash+`\E\b.*`)
1148                 c.Check(replicas, Equals, 2)
1149         }
1150         {
1151                 r, n, _, err := kc.Get(hash)
1152                 c.Check(err, IsNil)
1153                 c.Check(n, Equals, int64(len(content)))
1154                 if c.Check(r, NotNil) {
1155                         readContent, err := ioutil.ReadAll(r)
1156                         c.Check(err, IsNil)
1157                         if c.Check(len(readContent), Equals, len(content)) {
1158                                 c.Check(readContent, DeepEquals, content)
1159                         }
1160                         c.Check(r.Close(), IsNil)
1161                 }
1162         }
1163         {
1164                 n, url2, err := kc.Ask(hash)
1165                 c.Check(err, IsNil)
1166                 c.Check(n, Equals, int64(len(content)))
1167                 c.Check(url2, Matches, "http://localhost:\\d+/\\Q"+hash+"\\E")
1168         }
1169         {
1170                 loc, err := kc.LocalLocator(hash)
1171                 c.Check(err, IsNil)
1172                 c.Assert(len(loc) >= 32, Equals, true)
1173                 c.Check(loc[:32], Equals, hash[:32])
1174         }
1175         {
1176                 content := []byte("the perth county conspiracy")
1177                 loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
1178                 c.Check(loc, Equals, "")
1179                 c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
1180                 c.Check(err, ErrorMatches, `.*HTTP 400.*`)
1181         }
1182 }
1183
1184 type StubProxyHandler struct {
1185         handled chan string
1186 }
1187
1188 func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1189         resp.Header().Set("X-Keep-Replicas-Stored", "2")
1190         h.handled <- fmt.Sprintf("http://%s", req.Host)
1191 }
1192
1193 func (s *StandaloneSuite) TestPutProxy(c *C) {
1194         st := StubProxyHandler{make(chan string, 1)}
1195
1196         arv, err := arvadosclient.MakeArvadosClient()
1197         c.Check(err, IsNil)
1198         kc, _ := MakeKeepClient(arv)
1199
1200         kc.Want_replicas = 2
1201         arv.ApiToken = "abc123"
1202         localRoots := make(map[string]string)
1203         writableLocalRoots := make(map[string]string)
1204
1205         ks1 := RunSomeFakeKeepServers(st, 1)
1206
1207         for i, k := range ks1 {
1208                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1209                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1210                 defer k.listener.Close()
1211         }
1212
1213         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1214
1215         _, replicas, err := kc.PutB([]byte("foo"))
1216         <-st.handled
1217
1218         c.Check(err, IsNil)
1219         c.Check(replicas, Equals, 2)
1220 }
1221
1222 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
1223         st := StubProxyHandler{make(chan string, 1)}
1224
1225         arv, err := arvadosclient.MakeArvadosClient()
1226         c.Check(err, IsNil)
1227         kc, _ := MakeKeepClient(arv)
1228
1229         kc.Want_replicas = 3
1230         arv.ApiToken = "abc123"
1231         localRoots := make(map[string]string)
1232         writableLocalRoots := make(map[string]string)
1233
1234         ks1 := RunSomeFakeKeepServers(st, 1)
1235
1236         for i, k := range ks1 {
1237                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1238                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1239                 defer k.listener.Close()
1240         }
1241         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1242
1243         _, replicas, err := kc.PutB([]byte("foo"))
1244         <-st.handled
1245
1246         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1247         c.Check(replicas, Equals, 2)
1248 }
1249
1250 func (s *StandaloneSuite) TestMakeLocator(c *C) {
1251         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
1252         c.Check(err, IsNil)
1253         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1254         c.Check(l.Size, Equals, 3)
1255         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
1256 }
1257
1258 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
1259         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
1260         c.Check(err, IsNil)
1261         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1262         c.Check(l.Size, Equals, -1)
1263         c.Check(l.Hints, DeepEquals, []string{})
1264 }
1265
1266 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
1267         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
1268         c.Check(err, IsNil)
1269         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1270         c.Check(l.Size, Equals, -1)
1271         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
1272 }
1273
1274 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
1275         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
1276         l, err := MakeLocator(str)
1277         c.Check(err, IsNil)
1278         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1279         c.Check(l.Size, Equals, 3)
1280         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
1281         c.Check(l.String(), Equals, str)
1282 }
1283
1284 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
1285         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
1286         c.Check(err, Equals, InvalidLocatorError)
1287 }
1288
1289 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
1290         hash := Md5String("foo")
1291
1292         st := &StubPutHandler{
1293                 c:                    c,
1294                 expectPath:           hash,
1295                 expectAPIToken:       "abc123",
1296                 expectBody:           "foo",
1297                 expectStorageClass:   "default",
1298                 returnStorageClasses: "",
1299                 handled:              make(chan string, 5),
1300         }
1301
1302         arv, _ := arvadosclient.MakeArvadosClient()
1303         kc, _ := MakeKeepClient(arv)
1304
1305         kc.Want_replicas = 2
1306         arv.ApiToken = "abc123"
1307         localRoots := make(map[string]string)
1308         writableLocalRoots := make(map[string]string)
1309
1310         ks := RunSomeFakeKeepServers(st, 5)
1311
1312         for i, k := range ks {
1313                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1314                 if i == 0 {
1315                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1316                 }
1317                 defer k.listener.Close()
1318         }
1319
1320         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1321
1322         _, replicas, err := kc.PutB([]byte("foo"))
1323
1324         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1325         c.Check(replicas, Equals, 1)
1326
1327         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1328 }
1329
1330 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1331         hash := Md5String("foo")
1332
1333         st := &StubPutHandler{
1334                 c:                    c,
1335                 expectPath:           hash,
1336                 expectAPIToken:       "abc123",
1337                 expectBody:           "foo",
1338                 expectStorageClass:   "",
1339                 returnStorageClasses: "",
1340                 handled:              make(chan string, 5),
1341         }
1342
1343         arv, _ := arvadosclient.MakeArvadosClient()
1344         kc, _ := MakeKeepClient(arv)
1345
1346         kc.Want_replicas = 2
1347         arv.ApiToken = "abc123"
1348         localRoots := make(map[string]string)
1349         writableLocalRoots := make(map[string]string)
1350
1351         ks := RunSomeFakeKeepServers(st, 5)
1352
1353         for i, k := range ks {
1354                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1355                 defer k.listener.Close()
1356         }
1357
1358         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1359
1360         _, replicas, err := kc.PutB([]byte("foo"))
1361
1362         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1363         c.Check(replicas, Equals, 0)
1364 }
1365
1366 type StubGetIndexHandler struct {
1367         c              *C
1368         expectPath     string
1369         expectAPIToken string
1370         httpStatus     int
1371         body           []byte
1372 }
1373
1374 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1375         h.c.Check(req.URL.Path, Equals, h.expectPath)
1376         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1377         resp.WriteHeader(h.httpStatus)
1378         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1379         resp.Write(h.body)
1380 }
1381
1382 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1383         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1384
1385         st := StubGetIndexHandler{
1386                 c,
1387                 "/index",
1388                 "abc123",
1389                 http.StatusOK,
1390                 []byte(hash + " 1443559274\n\n")}
1391
1392         ks := RunFakeKeepServer(st)
1393         defer ks.listener.Close()
1394
1395         arv, err := arvadosclient.MakeArvadosClient()
1396         c.Assert(err, IsNil)
1397         kc, err := MakeKeepClient(arv)
1398         c.Assert(err, IsNil)
1399         arv.ApiToken = "abc123"
1400         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1401
1402         r, err := kc.GetIndex("x", "")
1403         c.Check(err, IsNil)
1404
1405         content, err2 := ioutil.ReadAll(r)
1406         c.Check(err2, IsNil)
1407         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1408 }
1409
1410 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1411         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1412
1413         st := StubGetIndexHandler{
1414                 c,
1415                 "/index/" + hash[0:3],
1416                 "abc123",
1417                 http.StatusOK,
1418                 []byte(hash + " 1443559274\n\n")}
1419
1420         ks := RunFakeKeepServer(st)
1421         defer ks.listener.Close()
1422
1423         arv, err := arvadosclient.MakeArvadosClient()
1424         c.Check(err, IsNil)
1425         kc, _ := MakeKeepClient(arv)
1426         arv.ApiToken = "abc123"
1427         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1428
1429         r, err := kc.GetIndex("x", hash[0:3])
1430         c.Assert(err, IsNil)
1431
1432         content, err2 := ioutil.ReadAll(r)
1433         c.Check(err2, IsNil)
1434         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1435 }
1436
1437 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1438         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1439
1440         st := StubGetIndexHandler{
1441                 c,
1442                 "/index/" + hash[0:3],
1443                 "abc123",
1444                 http.StatusOK,
1445                 []byte(hash)}
1446
1447         ks := RunFakeKeepServer(st)
1448         defer ks.listener.Close()
1449
1450         arv, err := arvadosclient.MakeArvadosClient()
1451         c.Check(err, IsNil)
1452         kc, _ := MakeKeepClient(arv)
1453         arv.ApiToken = "abc123"
1454         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1455
1456         _, err = kc.GetIndex("x", hash[0:3])
1457         c.Check(err, Equals, ErrIncompleteIndex)
1458 }
1459
1460 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1461         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1462
1463         st := StubGetIndexHandler{
1464                 c,
1465                 "/index/" + hash[0:3],
1466                 "abc123",
1467                 http.StatusOK,
1468                 []byte(hash)}
1469
1470         ks := RunFakeKeepServer(st)
1471         defer ks.listener.Close()
1472
1473         arv, err := arvadosclient.MakeArvadosClient()
1474         c.Check(err, IsNil)
1475         kc, _ := MakeKeepClient(arv)
1476         arv.ApiToken = "abc123"
1477         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1478
1479         _, err = kc.GetIndex("y", hash[0:3])
1480         c.Check(err, Equals, ErrNoSuchKeepServer)
1481 }
1482
1483 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1484         st := StubGetIndexHandler{
1485                 c,
1486                 "/index/abcd",
1487                 "abc123",
1488                 http.StatusOK,
1489                 []byte("\n")}
1490
1491         ks := RunFakeKeepServer(st)
1492         defer ks.listener.Close()
1493
1494         arv, err := arvadosclient.MakeArvadosClient()
1495         c.Check(err, IsNil)
1496         kc, _ := MakeKeepClient(arv)
1497         arv.ApiToken = "abc123"
1498         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1499
1500         r, err := kc.GetIndex("x", "abcd")
1501         c.Check(err, IsNil)
1502
1503         content, err2 := ioutil.ReadAll(r)
1504         c.Check(err2, IsNil)
1505         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1506 }
1507
1508 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1509         defer func(origDefault, origMinimum time.Duration) {
1510                 DefaultRetryDelay = origDefault
1511                 MinimumRetryDelay = origMinimum
1512         }(DefaultRetryDelay, MinimumRetryDelay)
1513         DefaultRetryDelay = time.Second / 8
1514         MinimumRetryDelay = time.Millisecond
1515
1516         for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
1517                 c.Logf("=== initial delay %v", delay)
1518
1519                 st := &FailThenSucceedHandler{
1520                         morefails: 5, // handler will fail 6x in total, 3 for each server
1521                         handled:   make(chan string, 10),
1522                         successhandler: &StubPutHandler{
1523                                 c:                    c,
1524                                 expectPath:           Md5String("foo"),
1525                                 expectAPIToken:       "abc123",
1526                                 expectBody:           "foo",
1527                                 expectStorageClass:   "default",
1528                                 returnStorageClasses: "",
1529                                 handled:              make(chan string, 5),
1530                         },
1531                 }
1532
1533                 arv, _ := arvadosclient.MakeArvadosClient()
1534                 kc, _ := MakeKeepClient(arv)
1535                 kc.Retries = 3
1536                 kc.RetryDelay = delay
1537                 kc.DiskCacheSize = DiskCacheDisabled
1538                 kc.Want_replicas = 2
1539
1540                 arv.ApiToken = "abc123"
1541                 localRoots := make(map[string]string)
1542                 writableLocalRoots := make(map[string]string)
1543
1544                 ks := RunSomeFakeKeepServers(st, 2)
1545
1546                 for i, k := range ks {
1547                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1548                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1549                         defer k.listener.Close()
1550                 }
1551
1552                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1553
1554                 t0 := time.Now()
1555                 hash, replicas, err := kc.PutB([]byte("foo"))
1556
1557                 c.Check(err, IsNil)
1558                 c.Check(hash, Equals, "")
1559                 c.Check(replicas, Equals, 2)
1560                 elapsed := time.Since(t0)
1561
1562                 nonsleeptime := time.Second / 10
1563                 expect := kc.RetryDelay
1564                 if expect == 0 {
1565                         expect = DefaultRetryDelay
1566                 }
1567                 min := MinimumRetryDelay * 3
1568                 max := expect + expect*2 + expect*2*2
1569                 max += nonsleeptime
1570                 c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
1571                 c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
1572         }
1573 }
1574
1575 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1576         arv, err := arvadosclient.MakeArvadosClient()
1577         c.Assert(err, IsNil)
1578
1579         // Add an additional "testblobstore" keepservice
1580         blobKeepService := make(arvadosclient.Dict)
1581         err = arv.Create("keep_services",
1582                 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1583                         "service_host": "localhost",
1584                         "service_port": "21321",
1585                         "service_type": "testblobstore"}},
1586                 &blobKeepService)
1587         c.Assert(err, IsNil)
1588         defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1589         RefreshServiceDiscovery()
1590
1591         // Make a keepclient and ensure that the testblobstore is included
1592         kc, err := MakeKeepClient(arv)
1593         c.Assert(err, IsNil)
1594
1595         // verify kc.LocalRoots
1596         c.Check(len(kc.LocalRoots()), Equals, 3)
1597         for _, root := range kc.LocalRoots() {
1598                 c.Check(root, Matches, "http://localhost:\\d+")
1599         }
1600         c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1601
1602         // verify kc.GatewayRoots
1603         c.Check(len(kc.GatewayRoots()), Equals, 3)
1604         for _, root := range kc.GatewayRoots() {
1605                 c.Check(root, Matches, "http://localhost:\\d+")
1606         }
1607         c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1608
1609         // verify kc.WritableLocalRoots
1610         c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1611         for _, root := range kc.WritableLocalRoots() {
1612                 c.Check(root, Matches, "http://localhost:\\d+")
1613         }
1614         c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1615
1616         c.Assert(kc.replicasPerService, Equals, 0)
1617         c.Assert(kc.foundNonDiskSvc, Equals, true)
1618         c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
1619 }
1620
1621 func (s *StandaloneSuite) TestDelayCalculator(c *C) {
1622         defer func(origDefault, origMinimum time.Duration) {
1623                 DefaultRetryDelay = origDefault
1624                 MinimumRetryDelay = origMinimum
1625         }(DefaultRetryDelay, MinimumRetryDelay)
1626
1627         checkInterval := func(d, min, max time.Duration) {
1628                 c.Check(d >= min, Equals, true)
1629                 c.Check(d <= max, Equals, true)
1630         }
1631
1632         MinimumRetryDelay = time.Second / 2
1633         DefaultRetryDelay = time.Second
1634         dc := delayCalculator{InitialMaxDelay: 0}
1635         checkInterval(dc.Next(), time.Second/2, time.Second)
1636         checkInterval(dc.Next(), time.Second/2, time.Second*2)
1637         checkInterval(dc.Next(), time.Second/2, time.Second*4)
1638         checkInterval(dc.Next(), time.Second/2, time.Second*8)
1639         checkInterval(dc.Next(), time.Second/2, time.Second*10)
1640         checkInterval(dc.Next(), time.Second/2, time.Second*10)
1641
1642         // Enforce non-zero InitialMaxDelay
1643         dc = delayCalculator{InitialMaxDelay: time.Second}
1644         checkInterval(dc.Next(), time.Second/2, time.Second*2)
1645         checkInterval(dc.Next(), time.Second/2, time.Second*4)
1646         checkInterval(dc.Next(), time.Second/2, time.Second*8)
1647         checkInterval(dc.Next(), time.Second/2, time.Second*16)
1648         checkInterval(dc.Next(), time.Second/2, time.Second*20)
1649         checkInterval(dc.Next(), time.Second/2, time.Second*20)
1650
1651         // Enforce MinimumRetryDelay
1652         dc = delayCalculator{InitialMaxDelay: time.Millisecond}
1653         checkInterval(dc.Next(), time.Second/2, time.Second/2)
1654         checkInterval(dc.Next(), time.Second/2, time.Second)
1655         checkInterval(dc.Next(), time.Second/2, time.Second*2)
1656         checkInterval(dc.Next(), time.Second/2, time.Second*4)
1657         checkInterval(dc.Next(), time.Second/2, time.Second*8)
1658         checkInterval(dc.Next(), time.Second/2, time.Second*10)
1659         checkInterval(dc.Next(), time.Second/2, time.Second*10)
1660
1661         // If InitialMaxDelay is less than MinimumRetryDelay/10, then
1662         // delay is always MinimumRetryDelay.
1663         dc = delayCalculator{InitialMaxDelay: time.Millisecond}
1664         for i := 0; i < 20; i++ {
1665                 c.Check(dc.Next(), Equals, time.Second/2)
1666         }
1667 }