Merge branch '22027-remove-themes-for-rails' refs #22027
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "net"
15         "net/http"
16         "os"
17         "strings"
18         "sync"
19         "sync/atomic"
20         "testing"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25         "git.arvados.org/arvados.git/sdk/go/arvadostest"
26         . "gopkg.in/check.v1"
27 )
28
29 func Test(t *testing.T) {
30         DefaultRetryDelay = 50 * time.Millisecond
31         TestingT(t)
32 }
33
34 // Gocheck boilerplate
35 var _ = Suite(&ServerRequiredSuite{})
36 var _ = Suite(&StandaloneSuite{})
37
38 // Tests that require the Keep server running
39 type ServerRequiredSuite struct{}
40
41 // Standalone tests
42 type StandaloneSuite struct {
43         origDefaultRetryDelay time.Duration
44         origMinimumRetryDelay time.Duration
45 }
46
47 var origHOME = os.Getenv("HOME")
48
49 func (s *StandaloneSuite) SetUpTest(c *C) {
50         RefreshServiceDiscovery()
51         // Prevent cache state from leaking between test cases
52         os.Setenv("HOME", c.MkDir())
53         s.origDefaultRetryDelay = DefaultRetryDelay
54         s.origMinimumRetryDelay = MinimumRetryDelay
55 }
56
57 func (s *StandaloneSuite) TearDownTest(c *C) {
58         os.Setenv("HOME", origHOME)
59         DefaultRetryDelay = s.origDefaultRetryDelay
60         MinimumRetryDelay = s.origMinimumRetryDelay
61 }
62
63 func pythonDir() string {
64         cwd, _ := os.Getwd()
65         return fmt.Sprintf("%s/../../python/tests", cwd)
66 }
67
68 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
69         arvadostest.StartKeep(2, false)
70 }
71
72 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
73         arvadostest.StopKeep(2)
74         os.Setenv("HOME", origHOME)
75 }
76
77 func (s *ServerRequiredSuite) SetUpTest(c *C) {
78         RefreshServiceDiscovery()
79         // Prevent cache state from leaking between test cases
80         os.Setenv("HOME", c.MkDir())
81 }
82
83 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
84         arv, err := arvadosclient.MakeArvadosClient()
85         c.Assert(err, IsNil)
86
87         kc, err := MakeKeepClient(arv)
88
89         c.Assert(err, IsNil)
90         c.Check(len(kc.LocalRoots()), Equals, 2)
91         for _, root := range kc.LocalRoots() {
92                 c.Check(root, Matches, "http://localhost:\\d+")
93         }
94 }
95
96 func (s *ServerRequiredSuite) TestDefaultStorageClasses(c *C) {
97         arv, err := arvadosclient.MakeArvadosClient()
98         c.Assert(err, IsNil)
99
100         cc, err := arv.ClusterConfig("StorageClasses")
101         c.Assert(err, IsNil)
102         c.Assert(cc, NotNil)
103         c.Assert(cc.(map[string]interface{})["default"], NotNil)
104
105         kc := New(arv)
106         c.Assert(kc.DefaultStorageClasses, DeepEquals, []string{"default"})
107 }
108
109 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
110         arv, err := arvadosclient.MakeArvadosClient()
111         c.Assert(err, IsNil)
112
113         kc, err := MakeKeepClient(arv)
114         c.Check(err, IsNil)
115         c.Assert(kc.Want_replicas, Equals, 2)
116
117         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
118         kc, err = MakeKeepClient(arv)
119         c.Check(err, IsNil)
120         c.Assert(kc.Want_replicas, Equals, 3)
121
122         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
123         kc, err = MakeKeepClient(arv)
124         c.Check(err, IsNil)
125         c.Assert(kc.Want_replicas, Equals, 1)
126 }
127
128 type StubPutHandler struct {
129         c                    *C
130         expectPath           string
131         expectAPIToken       string
132         expectBody           string
133         expectStorageClass   string
134         returnStorageClasses string
135         handled              chan string
136         requests             []*http.Request
137         mtx                  sync.Mutex
138 }
139
140 func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
141         sph.mtx.Lock()
142         sph.requests = append(sph.requests, req)
143         sph.mtx.Unlock()
144         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
145         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("Bearer %s", sph.expectAPIToken))
146         if sph.expectStorageClass != "*" {
147                 sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
148         }
149         body, err := ioutil.ReadAll(req.Body)
150         sph.c.Check(err, IsNil)
151         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
152         resp.Header().Set("X-Keep-Replicas-Stored", "1")
153         if sph.returnStorageClasses != "" {
154                 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
155         }
156         resp.WriteHeader(200)
157         sph.handled <- fmt.Sprintf("http://%s", req.Host)
158 }
159
160 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
161         var err error
162         // If we don't explicitly bind it to localhost, ks.listener.Addr() will
163         // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
164         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
165         if err != nil {
166                 panic("Could not listen on any port")
167         }
168         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
169         go http.Serve(ks.listener, st)
170         return
171 }
172
173 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
174         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
175
176         ks := RunFakeKeepServer(st)
177         defer ks.listener.Close()
178
179         arv, _ := arvadosclient.MakeArvadosClient()
180         arv.ApiToken = "abc123"
181
182         kc, _ := MakeKeepClient(arv)
183
184         reader, writer := io.Pipe()
185         uploadStatusChan := make(chan uploadStatus)
186
187         f(kc, ks.url, reader, writer, uploadStatusChan)
188 }
189
190 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
191         st := &StubPutHandler{
192                 c:                    c,
193                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
194                 expectAPIToken:       "abc123",
195                 expectBody:           "foo",
196                 expectStorageClass:   "",
197                 returnStorageClasses: "default=1",
198                 handled:              make(chan string),
199         }
200
201         UploadToStubHelper(c, st,
202                 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
203                         go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
204
205                         writer.Write([]byte("foo"))
206                         writer.Close()
207
208                         <-st.handled
209                         status := <-uploadStatusChan
210                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
211                 })
212 }
213
214 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
215         st := &StubPutHandler{
216                 c:                    c,
217                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
218                 expectAPIToken:       "abc123",
219                 expectBody:           "foo",
220                 expectStorageClass:   "",
221                 returnStorageClasses: "default=1",
222                 handled:              make(chan string),
223         }
224
225         UploadToStubHelper(c, st,
226                 func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
227                         go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
228
229                         <-st.handled
230
231                         status := <-uploadStatusChan
232                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
233                 })
234 }
235
236 func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
237         for _, trial := range []struct {
238                 respHeader string
239                 expectMap  map[string]int
240         }{
241                 {"", nil},
242                 {"foo=1", map[string]int{"foo": 1}},
243                 {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
244                 {" =foo=1 ", nil},
245                 {"foo", nil},
246         } {
247                 st := &StubPutHandler{
248                         c:                    c,
249                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
250                         expectAPIToken:       "abc123",
251                         expectBody:           "foo",
252                         expectStorageClass:   "",
253                         returnStorageClasses: trial.respHeader,
254                         handled:              make(chan string),
255                 }
256
257                 UploadToStubHelper(c, st,
258                         func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
259                                 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
260
261                                 writer.Write([]byte("foo"))
262                                 writer.Close()
263
264                                 <-st.handled
265                                 status := <-uploadStatusChan
266                                 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
267                         })
268         }
269 }
270
271 func (s *StandaloneSuite) TestPutWithoutStorageClassesClusterSupport(c *C) {
272         nServers := 5
273         for _, trial := range []struct {
274                 replicas      int
275                 clientClasses []string
276                 putClasses    []string
277                 minRequests   int
278                 maxRequests   int
279                 success       bool
280         }{
281                 // Talking to an older cluster (no default storage classes exported
282                 // config) and no other additional storage classes requirements.
283                 {1, nil, nil, 1, 1, true},
284                 {2, nil, nil, 2, 2, true},
285                 {3, nil, nil, 3, 3, true},
286                 {nServers*2 + 1, nil, nil, nServers, nServers, false},
287
288                 {1, []string{"class1"}, nil, 1, 1, true},
289                 {2, []string{"class1"}, nil, 2, 2, true},
290                 {3, []string{"class1"}, nil, 3, 3, true},
291                 {1, []string{"class1", "class2"}, nil, 1, 1, true},
292                 {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false},
293
294                 {1, nil, []string{"class1"}, 1, 1, true},
295                 {2, nil, []string{"class1"}, 2, 2, true},
296                 {3, nil, []string{"class1"}, 3, 3, true},
297                 {1, nil, []string{"class1", "class2"}, 1, 1, true},
298                 {nServers*2 + 1, nil, []string{"class1"}, nServers, nServers, false},
299         } {
300                 c.Logf("%+v", trial)
301                 st := &StubPutHandler{
302                         c:                    c,
303                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
304                         expectAPIToken:       "abc123",
305                         expectBody:           "foo",
306                         expectStorageClass:   "*",
307                         returnStorageClasses: "", // Simulate old cluster without SC keep support
308                         handled:              make(chan string, 100),
309                 }
310                 ks := RunSomeFakeKeepServers(st, nServers)
311                 arv, _ := arvadosclient.MakeArvadosClient()
312                 kc, _ := MakeKeepClient(arv)
313                 kc.Want_replicas = trial.replicas
314                 kc.StorageClasses = trial.clientClasses
315                 kc.DefaultStorageClasses = nil // Simulate an old cluster without SC defaults
316                 arv.ApiToken = "abc123"
317                 localRoots := make(map[string]string)
318                 writableLocalRoots := make(map[string]string)
319                 for i, k := range ks {
320                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
321                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
322                         defer k.listener.Close()
323                 }
324                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
325
326                 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
327                         Data:           []byte("foo"),
328                         StorageClasses: trial.putClasses,
329                 })
330                 if trial.success {
331                         c.Check(err, IsNil)
332                 } else {
333                         c.Check(err, NotNil)
334                 }
335                 c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
336                 c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
337                 if trial.clientClasses == nil && trial.putClasses == nil {
338                         c.Check(st.requests[0].Header.Get("X-Keep-Storage-Classes"), Equals, "")
339                 }
340         }
341 }
342
343 func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
344         nServers := 5
345         for _, trial := range []struct {
346                 replicas       int
347                 defaultClasses []string
348                 clientClasses  []string // clientClasses takes precedence over defaultClasses
349                 putClasses     []string // putClasses takes precedence over clientClasses
350                 minRequests    int
351                 maxRequests    int
352                 success        bool
353         }{
354                 {1, []string{"class1"}, nil, nil, 1, 1, true},
355                 {2, []string{"class1"}, nil, nil, 1, 2, true},
356                 {3, []string{"class1"}, nil, nil, 2, 3, true},
357                 {1, []string{"class1", "class2"}, nil, nil, 1, 1, true},
358
359                 // defaultClasses doesn't matter when any of the others is specified.
360                 {1, []string{"class1"}, []string{"class1"}, nil, 1, 1, true},
361                 {2, []string{"class1"}, []string{"class1"}, nil, 1, 2, true},
362                 {3, []string{"class1"}, []string{"class1"}, nil, 2, 3, true},
363                 {1, []string{"class1"}, []string{"class1", "class2"}, nil, 1, 1, true},
364                 {3, []string{"class1"}, nil, []string{"class1"}, 2, 3, true},
365                 {1, []string{"class1"}, nil, []string{"class1", "class2"}, 1, 1, true},
366                 {1, []string{"class1"}, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true},
367                 {1, []string{"class1"}, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false},
368                 {nServers*2 + 1, []string{}, []string{"class1"}, nil, nServers, nServers, false},
369                 {1, []string{"class1"}, []string{"class404"}, nil, nServers, nServers, false},
370                 {1, []string{"class1"}, []string{"class1", "class404"}, nil, nServers, nServers, false},
371                 {1, []string{"class1"}, nil, []string{"class1", "class404"}, nServers, nServers, false},
372         } {
373                 c.Logf("%+v", trial)
374                 st := &StubPutHandler{
375                         c:                    c,
376                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
377                         expectAPIToken:       "abc123",
378                         expectBody:           "foo",
379                         expectStorageClass:   "*",
380                         returnStorageClasses: "class1=2, class2=2",
381                         handled:              make(chan string, 100),
382                 }
383                 ks := RunSomeFakeKeepServers(st, nServers)
384                 arv, _ := arvadosclient.MakeArvadosClient()
385                 kc, _ := MakeKeepClient(arv)
386                 kc.Want_replicas = trial.replicas
387                 kc.StorageClasses = trial.clientClasses
388                 kc.DefaultStorageClasses = trial.defaultClasses
389                 arv.ApiToken = "abc123"
390                 localRoots := make(map[string]string)
391                 writableLocalRoots := make(map[string]string)
392                 for i, k := range ks {
393                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
394                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
395                         defer k.listener.Close()
396                 }
397                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
398
399                 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
400                         Data:           []byte("foo"),
401                         StorageClasses: trial.putClasses,
402                 })
403                 if trial.success {
404                         c.Check(err, IsNil)
405                 } else {
406                         c.Check(err, NotNil)
407                 }
408                 c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
409                 c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
410                 if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, Equals, true) {
411                         // Max concurrency should be 1. First request
412                         // should have succeeded for class1. Second
413                         // request should only ask for class404.
414                         c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), Equals, "class404")
415                 }
416         }
417 }
418
419 type FailHandler struct {
420         handled chan string
421 }
422
423 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
424         resp.WriteHeader(500)
425         fh.handled <- fmt.Sprintf("http://%s", req.Host)
426 }
427
428 type FailThenSucceedHandler struct {
429         morefails      int // fail 1 + this many times before succeeding
430         handled        chan string
431         count          atomic.Int64
432         successhandler http.Handler
433         reqIDs         []string
434 }
435
436 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
437         fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
438         if int(fh.count.Add(1)) <= fh.morefails+1 {
439                 resp.WriteHeader(500)
440                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
441         } else {
442                 fh.successhandler.ServeHTTP(resp, req)
443         }
444 }
445
446 type Error404Handler struct {
447         handled chan string
448 }
449
450 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
451         resp.WriteHeader(404)
452         fh.handled <- fmt.Sprintf("http://%s", req.Host)
453 }
454
455 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
456         st := FailHandler{
457                 make(chan string)}
458
459         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
460
461         UploadToStubHelper(c, st,
462                 func(kc *KeepClient, url string, reader io.ReadCloser,
463                         writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
464
465                         go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
466
467                         writer.Write([]byte("foo"))
468                         writer.Close()
469
470                         <-st.handled
471
472                         status := <-uploadStatusChan
473                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
474                         c.Check(status.statusCode, Equals, 500)
475                 })
476 }
477
478 type KeepServer struct {
479         listener net.Listener
480         url      string
481 }
482
483 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
484         ks = make([]KeepServer, n)
485
486         for i := 0; i < n; i++ {
487                 ks[i] = RunFakeKeepServer(st)
488         }
489
490         return ks
491 }
492
493 func (s *StandaloneSuite) TestPutB(c *C) {
494         hash := Md5String("foo")
495
496         st := &StubPutHandler{
497                 c:                    c,
498                 expectPath:           hash,
499                 expectAPIToken:       "abc123",
500                 expectBody:           "foo",
501                 expectStorageClass:   "default",
502                 returnStorageClasses: "",
503                 handled:              make(chan string, 5),
504         }
505
506         arv, _ := arvadosclient.MakeArvadosClient()
507         kc, _ := MakeKeepClient(arv)
508
509         kc.Want_replicas = 2
510         arv.ApiToken = "abc123"
511         localRoots := make(map[string]string)
512         writableLocalRoots := make(map[string]string)
513
514         ks := RunSomeFakeKeepServers(st, 5)
515
516         for i, k := range ks {
517                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
518                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
519                 defer k.listener.Close()
520         }
521
522         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
523
524         kc.PutB([]byte("foo"))
525
526         shuff := NewRootSorter(
527                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
528
529         s1 := <-st.handled
530         s2 := <-st.handled
531         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
532                 (s1 == shuff[1] && s2 == shuff[0]),
533                 Equals,
534                 true)
535 }
536
537 func (s *StandaloneSuite) TestPutHR(c *C) {
538         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
539
540         st := &StubPutHandler{
541                 c:                    c,
542                 expectPath:           hash,
543                 expectAPIToken:       "abc123",
544                 expectBody:           "foo",
545                 expectStorageClass:   "default",
546                 returnStorageClasses: "",
547                 handled:              make(chan string, 5),
548         }
549
550         arv, _ := arvadosclient.MakeArvadosClient()
551         kc, _ := MakeKeepClient(arv)
552
553         kc.Want_replicas = 2
554         arv.ApiToken = "abc123"
555         localRoots := make(map[string]string)
556         writableLocalRoots := make(map[string]string)
557
558         ks := RunSomeFakeKeepServers(st, 5)
559
560         for i, k := range ks {
561                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
562                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
563                 defer k.listener.Close()
564         }
565
566         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
567
568         kc.PutHR(hash, bytes.NewBuffer([]byte("foo")), 3)
569
570         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
571
572         s1 := <-st.handled
573         s2 := <-st.handled
574
575         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
576                 (s1 == shuff[1] && s2 == shuff[0]),
577                 Equals,
578                 true)
579 }
580
581 func (s *StandaloneSuite) TestPutWithFail(c *C) {
582         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
583
584         st := &StubPutHandler{
585                 c:                    c,
586                 expectPath:           hash,
587                 expectAPIToken:       "abc123",
588                 expectBody:           "foo",
589                 expectStorageClass:   "default",
590                 returnStorageClasses: "",
591                 handled:              make(chan string, 4),
592         }
593
594         fh := FailHandler{
595                 make(chan string, 1)}
596
597         arv, err := arvadosclient.MakeArvadosClient()
598         c.Check(err, IsNil)
599         kc, _ := MakeKeepClient(arv)
600
601         kc.Want_replicas = 2
602         arv.ApiToken = "abc123"
603         localRoots := make(map[string]string)
604         writableLocalRoots := make(map[string]string)
605
606         ks1 := RunSomeFakeKeepServers(st, 4)
607         ks2 := RunSomeFakeKeepServers(fh, 1)
608
609         for i, k := range ks1 {
610                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
611                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
612                 defer k.listener.Close()
613         }
614         for i, k := range ks2 {
615                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
616                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
617                 defer k.listener.Close()
618         }
619
620         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
621
622         shuff := NewRootSorter(
623                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
624         c.Logf("%+v", shuff)
625
626         phash, replicas, err := kc.PutB([]byte("foo"))
627
628         <-fh.handled
629
630         c.Check(err, IsNil)
631         c.Check(phash, Equals, "")
632         c.Check(replicas, Equals, 2)
633
634         s1 := <-st.handled
635         s2 := <-st.handled
636
637         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
638                 (s1 == shuff[2] && s2 == shuff[1]),
639                 Equals,
640                 true)
641 }
642
643 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
644         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
645
646         st := &StubPutHandler{
647                 c:                    c,
648                 expectPath:           hash,
649                 expectAPIToken:       "abc123",
650                 expectBody:           "foo",
651                 expectStorageClass:   "default",
652                 returnStorageClasses: "",
653                 handled:              make(chan string, 1),
654         }
655
656         fh := FailHandler{
657                 make(chan string, 4)}
658
659         arv, err := arvadosclient.MakeArvadosClient()
660         c.Check(err, IsNil)
661         kc, _ := MakeKeepClient(arv)
662
663         kc.Want_replicas = 2
664         kc.Retries = 0
665         arv.ApiToken = "abc123"
666         localRoots := make(map[string]string)
667         writableLocalRoots := make(map[string]string)
668
669         ks1 := RunSomeFakeKeepServers(st, 1)
670         ks2 := RunSomeFakeKeepServers(fh, 4)
671
672         for i, k := range ks1 {
673                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
674                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
675                 defer k.listener.Close()
676         }
677         for i, k := range ks2 {
678                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
679                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
680                 defer k.listener.Close()
681         }
682
683         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
684
685         _, replicas, err := kc.PutB([]byte("foo"))
686
687         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
688         c.Check(replicas, Equals, 1)
689         c.Check(<-st.handled, Equals, ks1[0].url)
690 }
691
692 type StubGetHandler struct {
693         c              *C
694         expectPath     string
695         expectAPIToken string
696         httpStatus     int
697         body           []byte
698 }
699
700 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
701         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
702         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("Bearer %s", sgh.expectAPIToken))
703         resp.WriteHeader(sgh.httpStatus)
704         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
705         resp.Write(sgh.body)
706 }
707
708 func (s *StandaloneSuite) TestGet(c *C) {
709         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
710
711         st := StubGetHandler{
712                 c,
713                 hash,
714                 "abc123",
715                 http.StatusOK,
716                 []byte("foo")}
717
718         ks := RunFakeKeepServer(st)
719         defer ks.listener.Close()
720
721         arv, err := arvadosclient.MakeArvadosClient()
722         c.Check(err, IsNil)
723         kc, _ := MakeKeepClient(arv)
724         arv.ApiToken = "abc123"
725         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
726
727         r, n, _, err := kc.Get(hash)
728         c.Assert(err, IsNil)
729         c.Check(n, Equals, int64(3))
730
731         content, err2 := ioutil.ReadAll(r)
732         c.Check(err2, IsNil)
733         c.Check(content, DeepEquals, []byte("foo"))
734         c.Check(r.Close(), IsNil)
735 }
736
737 func (s *StandaloneSuite) TestGet404(c *C) {
738         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
739
740         st := Error404Handler{make(chan string, 1)}
741
742         ks := RunFakeKeepServer(st)
743         defer ks.listener.Close()
744
745         arv, err := arvadosclient.MakeArvadosClient()
746         c.Check(err, IsNil)
747         kc, _ := MakeKeepClient(arv)
748         arv.ApiToken = "abc123"
749         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
750
751         r, n, _, err := kc.Get(hash)
752         c.Check(err, Equals, BlockNotFound)
753         c.Check(n, Equals, int64(0))
754         c.Check(r, IsNil)
755 }
756
757 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
758         st := Error404Handler{make(chan string, 1)}
759
760         ks := RunFakeKeepServer(st)
761         defer ks.listener.Close()
762
763         arv, err := arvadosclient.MakeArvadosClient()
764         c.Check(err, IsNil)
765         kc, _ := MakeKeepClient(arv)
766         arv.ApiToken = "abc123"
767         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
768
769         r, n, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
770         c.Check(err, IsNil)
771         c.Check(n, Equals, int64(0))
772         c.Assert(r, NotNil)
773         buf, err := ioutil.ReadAll(r)
774         c.Check(err, IsNil)
775         c.Check(buf, DeepEquals, []byte{})
776         c.Check(r.Close(), IsNil)
777 }
778
779 func (s *StandaloneSuite) TestGetFail(c *C) {
780         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
781
782         st := FailHandler{make(chan string, 1)}
783
784         ks := RunFakeKeepServer(st)
785         defer ks.listener.Close()
786
787         arv, err := arvadosclient.MakeArvadosClient()
788         c.Check(err, IsNil)
789         kc, _ := MakeKeepClient(arv)
790         arv.ApiToken = "abc123"
791         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
792         kc.Retries = 0
793
794         r, n, _, err := kc.Get(hash)
795         errNotFound, _ := err.(*ErrNotFound)
796         if c.Check(errNotFound, NotNil) {
797                 c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
798                 c.Check(errNotFound.Temporary(), Equals, true)
799         }
800         c.Check(n, Equals, int64(0))
801         c.Check(r, IsNil)
802 }
803
804 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
805         defer func(origDefault, origMinimum time.Duration) {
806                 DefaultRetryDelay = origDefault
807                 MinimumRetryDelay = origMinimum
808         }(DefaultRetryDelay, MinimumRetryDelay)
809         DefaultRetryDelay = time.Second / 8
810         MinimumRetryDelay = time.Millisecond
811
812         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
813
814         for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
815                 c.Logf("=== initial delay %v", delay)
816
817                 st := &FailThenSucceedHandler{
818                         morefails: 2,
819                         handled:   make(chan string, 4),
820                         successhandler: StubGetHandler{
821                                 c,
822                                 hash,
823                                 "abc123",
824                                 http.StatusOK,
825                                 []byte("foo")}}
826
827                 ks := RunFakeKeepServer(st)
828                 defer ks.listener.Close()
829
830                 arv, err := arvadosclient.MakeArvadosClient()
831                 c.Check(err, IsNil)
832                 kc, _ := MakeKeepClient(arv)
833                 arv.ApiToken = "abc123"
834                 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
835                 kc.Retries = 3
836                 kc.RetryDelay = delay
837                 kc.DiskCacheSize = DiskCacheDisabled
838
839                 t0 := time.Now()
840                 r, n, _, err := kc.Get(hash)
841                 c.Assert(err, IsNil)
842                 c.Check(n, Equals, int64(3))
843                 elapsed := time.Since(t0)
844
845                 nonsleeptime := time.Second / 10
846                 expect := kc.RetryDelay
847                 if expect == 0 {
848                         expect = DefaultRetryDelay
849                 }
850                 min := MinimumRetryDelay * 3
851                 max := expect + expect*2 + expect*2*2 + nonsleeptime
852                 c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
853                 c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
854
855                 content, err := ioutil.ReadAll(r)
856                 c.Check(err, IsNil)
857                 c.Check(content, DeepEquals, []byte("foo"))
858                 c.Check(r.Close(), IsNil)
859
860                 c.Logf("%q", st.reqIDs)
861                 if c.Check(st.reqIDs, Not(HasLen), 0) {
862                         for _, reqid := range st.reqIDs {
863                                 c.Check(reqid, Not(Equals), "")
864                                 c.Check(reqid, Equals, st.reqIDs[0])
865                         }
866                 }
867         }
868 }
869
870 func (s *StandaloneSuite) TestGetNetError(c *C) {
871         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
872
873         arv, err := arvadosclient.MakeArvadosClient()
874         c.Check(err, IsNil)
875         kc, _ := MakeKeepClient(arv)
876         arv.ApiToken = "abc123"
877         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
878
879         r, n, _, err := kc.Get(hash)
880         errNotFound, _ := err.(*ErrNotFound)
881         if c.Check(errNotFound, NotNil) {
882                 c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
883                 c.Check(errNotFound.Temporary(), Equals, true)
884         }
885         c.Check(n, Equals, int64(0))
886         c.Check(r, IsNil)
887 }
888
889 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
890         uuid := "zzzzz-bi6l4-123451234512345"
891         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
892
893         // This one shouldn't be used:
894         ks0 := RunFakeKeepServer(StubGetHandler{
895                 c,
896                 "error if used",
897                 "abc123",
898                 http.StatusOK,
899                 []byte("foo")})
900         defer ks0.listener.Close()
901         // This one should be used:
902         ks := RunFakeKeepServer(StubGetHandler{
903                 c,
904                 hash + "+K@" + uuid,
905                 "abc123",
906                 http.StatusOK,
907                 []byte("foo")})
908         defer ks.listener.Close()
909
910         arv, err := arvadosclient.MakeArvadosClient()
911         c.Check(err, IsNil)
912         kc, _ := MakeKeepClient(arv)
913         arv.ApiToken = "abc123"
914         kc.SetServiceRoots(
915                 map[string]string{"x": ks0.url},
916                 nil,
917                 map[string]string{uuid: ks.url})
918
919         r, n, _, err := kc.Get(hash + "+K@" + uuid)
920         c.Assert(err, IsNil)
921         c.Check(n, Equals, int64(3))
922
923         content, err := ioutil.ReadAll(r)
924         c.Check(err, IsNil)
925         c.Check(content, DeepEquals, []byte("foo"))
926         c.Check(r.Close(), IsNil)
927 }
928
929 // Use a service hint to fetch from a local disk service, overriding
930 // rendezvous probe order.
931 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
932         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
933         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
934
935         // This one shouldn't be used, although it appears first in
936         // rendezvous probe order:
937         ks0 := RunFakeKeepServer(StubGetHandler{
938                 c,
939                 "error if used",
940                 "abc123",
941                 http.StatusBadGateway,
942                 nil})
943         defer ks0.listener.Close()
944         // This one should be used:
945         ks := RunFakeKeepServer(StubGetHandler{
946                 c,
947                 hash + "+K@" + uuid,
948                 "abc123",
949                 http.StatusOK,
950                 []byte("foo")})
951         defer ks.listener.Close()
952
953         arv, err := arvadosclient.MakeArvadosClient()
954         c.Check(err, IsNil)
955         kc, _ := MakeKeepClient(arv)
956         arv.ApiToken = "abc123"
957         kc.SetServiceRoots(
958                 map[string]string{
959                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
960                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
961                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
962                         uuid:                          ks.url},
963                 nil,
964                 map[string]string{
965                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
966                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
967                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
968                         uuid:                          ks.url},
969         )
970
971         r, n, _, err := kc.Get(hash + "+K@" + uuid)
972         c.Assert(err, IsNil)
973         c.Check(n, Equals, int64(3))
974
975         content, err := ioutil.ReadAll(r)
976         c.Check(err, IsNil)
977         c.Check(content, DeepEquals, []byte("foo"))
978         c.Check(r.Close(), IsNil)
979 }
980
981 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
982         uuid := "zzzzz-bi6l4-123451234512345"
983         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
984
985         ksLocal := RunFakeKeepServer(StubGetHandler{
986                 c,
987                 hash + "+K@" + uuid,
988                 "abc123",
989                 http.StatusOK,
990                 []byte("foo")})
991         defer ksLocal.listener.Close()
992         ksGateway := RunFakeKeepServer(StubGetHandler{
993                 c,
994                 hash + "+K@" + uuid,
995                 "abc123",
996                 http.StatusInternalServerError,
997                 []byte("Error")})
998         defer ksGateway.listener.Close()
999
1000         arv, err := arvadosclient.MakeArvadosClient()
1001         c.Check(err, IsNil)
1002         kc, _ := MakeKeepClient(arv)
1003         arv.ApiToken = "abc123"
1004         kc.SetServiceRoots(
1005                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
1006                 nil,
1007                 map[string]string{uuid: ksGateway.url})
1008
1009         r, n, _, err := kc.Get(hash + "+K@" + uuid)
1010         c.Assert(err, IsNil)
1011         c.Check(n, Equals, int64(3))
1012
1013         content, err := ioutil.ReadAll(r)
1014         c.Check(err, IsNil)
1015         c.Check(content, DeepEquals, []byte("foo"))
1016         c.Check(r.Close(), IsNil)
1017 }
1018
1019 type BarHandler struct {
1020         handled chan string
1021 }
1022
1023 func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1024         resp.Write([]byte("bar"))
1025         h.handled <- fmt.Sprintf("http://%s", req.Host)
1026 }
1027
1028 func (s *StandaloneSuite) TestChecksum(c *C) {
1029         foohash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1030         barhash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
1031
1032         st := BarHandler{make(chan string, 1)}
1033
1034         ks := RunFakeKeepServer(st)
1035         defer ks.listener.Close()
1036
1037         arv, err := arvadosclient.MakeArvadosClient()
1038         c.Check(err, IsNil)
1039         kc, _ := MakeKeepClient(arv)
1040         arv.ApiToken = "abc123"
1041         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1042
1043         r, n, _, err := kc.Get(barhash)
1044         if c.Check(err, IsNil) {
1045                 _, err = ioutil.ReadAll(r)
1046                 c.Check(n, Equals, int64(3))
1047                 c.Check(err, IsNil)
1048         }
1049
1050         select {
1051         case <-st.handled:
1052         case <-time.After(time.Second):
1053                 c.Fatal("timed out")
1054         }
1055
1056         r, n, _, err = kc.Get(foohash)
1057         if err == nil {
1058                 buf, readerr := ioutil.ReadAll(r)
1059                 c.Logf("%q", buf)
1060                 err = readerr
1061         }
1062         c.Check(err, Equals, BadChecksum)
1063
1064         select {
1065         case <-st.handled:
1066         case <-time.After(time.Second):
1067                 c.Fatal("timed out")
1068         }
1069 }
1070
1071 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
1072         content := []byte("waz")
1073         hash := fmt.Sprintf("%x+3", md5.Sum(content))
1074
1075         fh := Error404Handler{
1076                 make(chan string, 4)}
1077
1078         st := StubGetHandler{
1079                 c,
1080                 hash,
1081                 "abc123",
1082                 http.StatusOK,
1083                 content}
1084
1085         arv, err := arvadosclient.MakeArvadosClient()
1086         c.Check(err, IsNil)
1087         kc, _ := MakeKeepClient(arv)
1088         arv.ApiToken = "abc123"
1089         localRoots := make(map[string]string)
1090         writableLocalRoots := make(map[string]string)
1091
1092         ks1 := RunSomeFakeKeepServers(st, 1)
1093         ks2 := RunSomeFakeKeepServers(fh, 4)
1094
1095         for i, k := range ks1 {
1096                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1097                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1098                 defer k.listener.Close()
1099         }
1100         for i, k := range ks2 {
1101                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
1102                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
1103                 defer k.listener.Close()
1104         }
1105
1106         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1107         kc.Retries = 0
1108
1109         // This test works only if one of the failing services is
1110         // attempted before the succeeding service. Otherwise,
1111         // <-fh.handled below will just hang! (Probe order depends on
1112         // the choice of block content "waz" and the UUIDs of the fake
1113         // servers, so we just tried different strings until we found
1114         // an example that passes this Assert.)
1115         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
1116
1117         r, n, _, err := kc.Get(hash)
1118
1119         select {
1120         case <-fh.handled:
1121         case <-time.After(time.Second):
1122                 c.Fatal("timed out")
1123         }
1124         c.Assert(err, IsNil)
1125         c.Check(n, Equals, int64(3))
1126
1127         readContent, err2 := ioutil.ReadAll(r)
1128         c.Check(err2, IsNil)
1129         c.Check(readContent, DeepEquals, content)
1130         c.Check(r.Close(), IsNil)
1131 }
1132
1133 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
1134         content := []byte("TestPutGetHead")
1135
1136         arv, err := arvadosclient.MakeArvadosClient()
1137         c.Check(err, IsNil)
1138         kc, err := MakeKeepClient(arv)
1139         c.Assert(err, IsNil)
1140
1141         hash := fmt.Sprintf("%x+%d", md5.Sum(content), len(content))
1142
1143         {
1144                 n, _, err := kc.Ask(hash)
1145                 c.Check(err, Equals, BlockNotFound)
1146                 c.Check(n, Equals, int64(0))
1147         }
1148         {
1149                 hash2, replicas, err := kc.PutB(content)
1150                 c.Check(err, IsNil)
1151                 c.Check(hash2, Matches, `\Q`+hash+`\E\b.*`)
1152                 c.Check(replicas, Equals, 2)
1153         }
1154         {
1155                 r, n, _, err := kc.Get(hash)
1156                 c.Check(err, IsNil)
1157                 c.Check(n, Equals, int64(len(content)))
1158                 if c.Check(r, NotNil) {
1159                         readContent, err := ioutil.ReadAll(r)
1160                         c.Check(err, IsNil)
1161                         if c.Check(len(readContent), Equals, len(content)) {
1162                                 c.Check(readContent, DeepEquals, content)
1163                         }
1164                         c.Check(r.Close(), IsNil)
1165                 }
1166         }
1167         {
1168                 n, url2, err := kc.Ask(hash)
1169                 c.Check(err, IsNil)
1170                 c.Check(n, Equals, int64(len(content)))
1171                 c.Check(url2, Matches, "http://localhost:\\d+/\\Q"+hash+"\\E")
1172         }
1173         {
1174                 loc, err := kc.LocalLocator(hash)
1175                 c.Check(err, IsNil)
1176                 c.Assert(len(loc) >= 32, Equals, true)
1177                 c.Check(loc[:32], Equals, hash[:32])
1178         }
1179         {
1180                 content := []byte("the perth county conspiracy")
1181                 loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
1182                 c.Check(loc, Equals, "")
1183                 c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
1184                 c.Check(err, ErrorMatches, `.*HTTP 400.*`)
1185         }
1186 }
1187
1188 type StubProxyHandler struct {
1189         handled chan string
1190 }
1191
1192 func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1193         resp.Header().Set("X-Keep-Replicas-Stored", "2")
1194         h.handled <- fmt.Sprintf("http://%s", req.Host)
1195 }
1196
1197 func (s *StandaloneSuite) TestPutProxy(c *C) {
1198         st := StubProxyHandler{make(chan string, 1)}
1199
1200         arv, err := arvadosclient.MakeArvadosClient()
1201         c.Check(err, IsNil)
1202         kc, _ := MakeKeepClient(arv)
1203
1204         kc.Want_replicas = 2
1205         arv.ApiToken = "abc123"
1206         localRoots := make(map[string]string)
1207         writableLocalRoots := make(map[string]string)
1208
1209         ks1 := RunSomeFakeKeepServers(st, 1)
1210
1211         for i, k := range ks1 {
1212                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1213                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1214                 defer k.listener.Close()
1215         }
1216
1217         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1218
1219         _, replicas, err := kc.PutB([]byte("foo"))
1220         <-st.handled
1221
1222         c.Check(err, IsNil)
1223         c.Check(replicas, Equals, 2)
1224 }
1225
1226 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
1227         st := StubProxyHandler{make(chan string, 1)}
1228
1229         arv, err := arvadosclient.MakeArvadosClient()
1230         c.Check(err, IsNil)
1231         kc, _ := MakeKeepClient(arv)
1232
1233         kc.Want_replicas = 3
1234         arv.ApiToken = "abc123"
1235         localRoots := make(map[string]string)
1236         writableLocalRoots := make(map[string]string)
1237
1238         ks1 := RunSomeFakeKeepServers(st, 1)
1239
1240         for i, k := range ks1 {
1241                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1242                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1243                 defer k.listener.Close()
1244         }
1245         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1246
1247         _, replicas, err := kc.PutB([]byte("foo"))
1248         <-st.handled
1249
1250         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1251         c.Check(replicas, Equals, 2)
1252 }
1253
1254 func (s *StandaloneSuite) TestMakeLocator(c *C) {
1255         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
1256         c.Check(err, IsNil)
1257         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1258         c.Check(l.Size, Equals, 3)
1259         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
1260 }
1261
1262 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
1263         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
1264         c.Check(err, IsNil)
1265         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1266         c.Check(l.Size, Equals, -1)
1267         c.Check(l.Hints, DeepEquals, []string{})
1268 }
1269
1270 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
1271         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
1272         c.Check(err, IsNil)
1273         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1274         c.Check(l.Size, Equals, -1)
1275         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
1276 }
1277
1278 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
1279         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
1280         l, err := MakeLocator(str)
1281         c.Check(err, IsNil)
1282         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1283         c.Check(l.Size, Equals, 3)
1284         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
1285         c.Check(l.String(), Equals, str)
1286 }
1287
1288 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
1289         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
1290         c.Check(err, Equals, InvalidLocatorError)
1291 }
1292
1293 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
1294         hash := Md5String("foo")
1295
1296         st := &StubPutHandler{
1297                 c:                    c,
1298                 expectPath:           hash,
1299                 expectAPIToken:       "abc123",
1300                 expectBody:           "foo",
1301                 expectStorageClass:   "default",
1302                 returnStorageClasses: "",
1303                 handled:              make(chan string, 5),
1304         }
1305
1306         arv, _ := arvadosclient.MakeArvadosClient()
1307         kc, _ := MakeKeepClient(arv)
1308
1309         kc.Want_replicas = 2
1310         arv.ApiToken = "abc123"
1311         localRoots := make(map[string]string)
1312         writableLocalRoots := make(map[string]string)
1313
1314         ks := RunSomeFakeKeepServers(st, 5)
1315
1316         for i, k := range ks {
1317                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1318                 if i == 0 {
1319                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1320                 }
1321                 defer k.listener.Close()
1322         }
1323
1324         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1325
1326         _, replicas, err := kc.PutB([]byte("foo"))
1327
1328         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1329         c.Check(replicas, Equals, 1)
1330
1331         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1332 }
1333
1334 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1335         hash := Md5String("foo")
1336
1337         st := &StubPutHandler{
1338                 c:                    c,
1339                 expectPath:           hash,
1340                 expectAPIToken:       "abc123",
1341                 expectBody:           "foo",
1342                 expectStorageClass:   "",
1343                 returnStorageClasses: "",
1344                 handled:              make(chan string, 5),
1345         }
1346
1347         arv, _ := arvadosclient.MakeArvadosClient()
1348         kc, _ := MakeKeepClient(arv)
1349
1350         kc.Want_replicas = 2
1351         arv.ApiToken = "abc123"
1352         localRoots := make(map[string]string)
1353         writableLocalRoots := make(map[string]string)
1354
1355         ks := RunSomeFakeKeepServers(st, 5)
1356
1357         for i, k := range ks {
1358                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1359                 defer k.listener.Close()
1360         }
1361
1362         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1363
1364         _, replicas, err := kc.PutB([]byte("foo"))
1365
1366         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1367         c.Check(replicas, Equals, 0)
1368 }
1369
1370 type StubGetIndexHandler struct {
1371         c              *C
1372         expectPath     string
1373         expectAPIToken string
1374         httpStatus     int
1375         body           []byte
1376 }
1377
1378 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1379         h.c.Check(req.URL.Path, Equals, h.expectPath)
1380         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("Bearer %s", h.expectAPIToken))
1381         resp.WriteHeader(h.httpStatus)
1382         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1383         resp.Write(h.body)
1384 }
1385
1386 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1387         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1388
1389         st := StubGetIndexHandler{
1390                 c,
1391                 "/index",
1392                 "abc123",
1393                 http.StatusOK,
1394                 []byte(hash + " 1443559274\n\n")}
1395
1396         ks := RunFakeKeepServer(st)
1397         defer ks.listener.Close()
1398
1399         arv, err := arvadosclient.MakeArvadosClient()
1400         c.Assert(err, IsNil)
1401         kc, err := MakeKeepClient(arv)
1402         c.Assert(err, IsNil)
1403         arv.ApiToken = "abc123"
1404         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1405
1406         r, err := kc.GetIndex("x", "")
1407         c.Check(err, IsNil)
1408
1409         content, err2 := ioutil.ReadAll(r)
1410         c.Check(err2, IsNil)
1411         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1412 }
1413
1414 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1415         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1416
1417         st := StubGetIndexHandler{
1418                 c,
1419                 "/index/" + hash[0:3],
1420                 "abc123",
1421                 http.StatusOK,
1422                 []byte(hash + " 1443559274\n\n")}
1423
1424         ks := RunFakeKeepServer(st)
1425         defer ks.listener.Close()
1426
1427         arv, err := arvadosclient.MakeArvadosClient()
1428         c.Check(err, IsNil)
1429         kc, _ := MakeKeepClient(arv)
1430         arv.ApiToken = "abc123"
1431         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1432
1433         r, err := kc.GetIndex("x", hash[0:3])
1434         c.Assert(err, IsNil)
1435
1436         content, err2 := ioutil.ReadAll(r)
1437         c.Check(err2, IsNil)
1438         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1439 }
1440
1441 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1442         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1443
1444         st := StubGetIndexHandler{
1445                 c,
1446                 "/index/" + hash[0:3],
1447                 "abc123",
1448                 http.StatusOK,
1449                 []byte(hash)}
1450
1451         ks := RunFakeKeepServer(st)
1452         defer ks.listener.Close()
1453
1454         arv, err := arvadosclient.MakeArvadosClient()
1455         c.Check(err, IsNil)
1456         kc, _ := MakeKeepClient(arv)
1457         arv.ApiToken = "abc123"
1458         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1459
1460         _, err = kc.GetIndex("x", hash[0:3])
1461         c.Check(err, Equals, ErrIncompleteIndex)
1462 }
1463
1464 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1465         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1466
1467         st := StubGetIndexHandler{
1468                 c,
1469                 "/index/" + hash[0:3],
1470                 "abc123",
1471                 http.StatusOK,
1472                 []byte(hash)}
1473
1474         ks := RunFakeKeepServer(st)
1475         defer ks.listener.Close()
1476
1477         arv, err := arvadosclient.MakeArvadosClient()
1478         c.Check(err, IsNil)
1479         kc, _ := MakeKeepClient(arv)
1480         arv.ApiToken = "abc123"
1481         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1482
1483         _, err = kc.GetIndex("y", hash[0:3])
1484         c.Check(err, Equals, ErrNoSuchKeepServer)
1485 }
1486
1487 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1488         st := StubGetIndexHandler{
1489                 c,
1490                 "/index/abcd",
1491                 "abc123",
1492                 http.StatusOK,
1493                 []byte("\n")}
1494
1495         ks := RunFakeKeepServer(st)
1496         defer ks.listener.Close()
1497
1498         arv, err := arvadosclient.MakeArvadosClient()
1499         c.Check(err, IsNil)
1500         kc, _ := MakeKeepClient(arv)
1501         arv.ApiToken = "abc123"
1502         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1503
1504         r, err := kc.GetIndex("x", "abcd")
1505         c.Check(err, IsNil)
1506
1507         content, err2 := ioutil.ReadAll(r)
1508         c.Check(err2, IsNil)
1509         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1510 }
1511
1512 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1513         DefaultRetryDelay = time.Second / 8
1514         MinimumRetryDelay = time.Millisecond
1515
1516         for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
1517                 c.Logf("=== initial delay %v", delay)
1518
1519                 st := &FailThenSucceedHandler{
1520                         morefails: 5, // handler will fail 6x in total, 3 for each server
1521                         handled:   make(chan string, 10),
1522                         successhandler: &StubPutHandler{
1523                                 c:                    c,
1524                                 expectPath:           Md5String("foo"),
1525                                 expectAPIToken:       "abc123",
1526                                 expectBody:           "foo",
1527                                 expectStorageClass:   "default",
1528                                 returnStorageClasses: "",
1529                                 handled:              make(chan string, 5),
1530                         },
1531                 }
1532
1533                 arv, _ := arvadosclient.MakeArvadosClient()
1534                 kc, _ := MakeKeepClient(arv)
1535                 kc.Retries = 3
1536                 kc.RetryDelay = delay
1537                 kc.DiskCacheSize = DiskCacheDisabled
1538                 kc.Want_replicas = 2
1539
1540                 arv.ApiToken = "abc123"
1541                 localRoots := make(map[string]string)
1542                 writableLocalRoots := make(map[string]string)
1543
1544                 ks := RunSomeFakeKeepServers(st, 2)
1545
1546                 for i, k := range ks {
1547                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1548                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1549                         defer k.listener.Close()
1550                 }
1551
1552                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1553
1554                 t0 := time.Now()
1555                 hash, replicas, err := kc.PutB([]byte("foo"))
1556
1557                 c.Check(err, IsNil)
1558                 c.Check(hash, Equals, "")
1559                 c.Check(replicas, Equals, 2)
1560                 elapsed := time.Since(t0)
1561
1562                 nonsleeptime := time.Second / 10
1563                 expect := kc.RetryDelay
1564                 if expect == 0 {
1565                         expect = DefaultRetryDelay
1566                 }
1567                 min := MinimumRetryDelay * 3
1568                 max := expect + expect*2 + expect*2*2
1569                 max += nonsleeptime
1570                 checkInterval(c, elapsed, min, max)
1571         }
1572 }
1573
1574 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1575         arv, err := arvadosclient.MakeArvadosClient()
1576         c.Assert(err, IsNil)
1577
1578         // Add an additional "testblobstore" keepservice
1579         blobKeepService := make(arvadosclient.Dict)
1580         err = arv.Create("keep_services",
1581                 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1582                         "service_host": "localhost",
1583                         "service_port": "21321",
1584                         "service_type": "testblobstore"}},
1585                 &blobKeepService)
1586         c.Assert(err, IsNil)
1587         defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1588         RefreshServiceDiscovery()
1589
1590         // Make a keepclient and ensure that the testblobstore is included
1591         kc, err := MakeKeepClient(arv)
1592         c.Assert(err, IsNil)
1593
1594         // verify kc.LocalRoots
1595         c.Check(len(kc.LocalRoots()), Equals, 3)
1596         for _, root := range kc.LocalRoots() {
1597                 c.Check(root, Matches, "http://localhost:\\d+")
1598         }
1599         c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1600
1601         // verify kc.GatewayRoots
1602         c.Check(len(kc.GatewayRoots()), Equals, 3)
1603         for _, root := range kc.GatewayRoots() {
1604                 c.Check(root, Matches, "http://localhost:\\d+")
1605         }
1606         c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1607
1608         // verify kc.WritableLocalRoots
1609         c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1610         for _, root := range kc.WritableLocalRoots() {
1611                 c.Check(root, Matches, "http://localhost:\\d+")
1612         }
1613         c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1614
1615         c.Assert(kc.replicasPerService, Equals, 0)
1616         c.Assert(kc.foundNonDiskSvc, Equals, true)
1617         c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
1618 }
1619
1620 func (s *StandaloneSuite) TestDelayCalculator_Default(c *C) {
1621         MinimumRetryDelay = time.Second / 2
1622         DefaultRetryDelay = time.Second
1623
1624         dc := delayCalculator{InitialMaxDelay: 0}
1625         checkInterval(c, dc.Next(), time.Second/2, time.Second)
1626         checkInterval(c, dc.Next(), time.Second/2, time.Second*2)
1627         checkInterval(c, dc.Next(), time.Second/2, time.Second*4)
1628         checkInterval(c, dc.Next(), time.Second/2, time.Second*8)
1629         checkInterval(c, dc.Next(), time.Second/2, time.Second*10)
1630         checkInterval(c, dc.Next(), time.Second/2, time.Second*10)
1631 }
1632
1633 func (s *StandaloneSuite) TestDelayCalculator_SetInitial(c *C) {
1634         MinimumRetryDelay = time.Second / 2
1635         DefaultRetryDelay = time.Second
1636
1637         dc := delayCalculator{InitialMaxDelay: time.Second * 2}
1638         checkInterval(c, dc.Next(), time.Second/2, time.Second*2)
1639         checkInterval(c, dc.Next(), time.Second/2, time.Second*4)
1640         checkInterval(c, dc.Next(), time.Second/2, time.Second*8)
1641         checkInterval(c, dc.Next(), time.Second/2, time.Second*16)
1642         checkInterval(c, dc.Next(), time.Second/2, time.Second*20)
1643         checkInterval(c, dc.Next(), time.Second/2, time.Second*20)
1644         checkInterval(c, dc.Next(), time.Second/2, time.Second*20)
1645 }
1646
1647 func (s *StandaloneSuite) TestDelayCalculator_EnsureSomeLongDelays(c *C) {
1648         dc := delayCalculator{InitialMaxDelay: time.Second * 5}
1649         var d time.Duration
1650         n := 4000
1651         for i := 0; i < n; i++ {
1652                 if i < 20 || i%10 == 0 {
1653                         c.Logf("i=%d, delay=%v", i, d)
1654                 }
1655                 if d = dc.Next(); d > dc.InitialMaxDelay*9 {
1656                         return
1657                 }
1658         }
1659         c.Errorf("after %d trials, never got a delay more than 90%% of expected max %d; last was %v", n, dc.InitialMaxDelay*10, d)
1660 }
1661
1662 // If InitialMaxDelay is less than MinimumRetryDelay/10, then delay is
1663 // always MinimumRetryDelay.
1664 func (s *StandaloneSuite) TestDelayCalculator_InitialLessThanMinimum(c *C) {
1665         MinimumRetryDelay = time.Second / 2
1666         dc := delayCalculator{InitialMaxDelay: time.Millisecond}
1667         for i := 0; i < 20; i++ {
1668                 c.Check(dc.Next(), Equals, time.Second/2)
1669         }
1670 }
1671
1672 func checkInterval(c *C, t, min, max time.Duration) {
1673         c.Check(t >= min, Equals, true, Commentf("got %v which is below expected min %v", t, min))
1674         c.Check(t <= max, Equals, true, Commentf("got %v which is above expected max %v", t, max))
1675 }