Merge branch 'main' into 21359-rightclick-newproject-bug
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "net"
16         "net/http"
17         "os"
18         "strings"
19         "sync"
20         "testing"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25         "git.arvados.org/arvados.git/sdk/go/arvadostest"
26         . "gopkg.in/check.v1"
27 )
28
29 // Gocheck boilerplate
30 func Test(t *testing.T) {
31         TestingT(t)
32 }
33
34 // Gocheck boilerplate
35 var _ = Suite(&ServerRequiredSuite{})
36 var _ = Suite(&StandaloneSuite{})
37
38 // Tests that require the Keep server running
39 type ServerRequiredSuite struct{}
40
41 // Standalone tests
42 type StandaloneSuite struct{}
43
44 var origHOME = os.Getenv("HOME")
45
46 func (s *StandaloneSuite) SetUpTest(c *C) {
47         RefreshServiceDiscovery()
48         // Prevent cache state from leaking between test cases
49         os.Setenv("HOME", c.MkDir())
50 }
51
52 func (s *StandaloneSuite) TearDownTest(c *C) {
53         os.Setenv("HOME", origHOME)
54 }
55
56 func pythonDir() string {
57         cwd, _ := os.Getwd()
58         return fmt.Sprintf("%s/../../python/tests", cwd)
59 }
60
61 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
62         arvadostest.StartKeep(2, false)
63 }
64
65 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
66         arvadostest.StopKeep(2)
67         os.Setenv("HOME", origHOME)
68 }
69
70 func (s *ServerRequiredSuite) SetUpTest(c *C) {
71         RefreshServiceDiscovery()
72         // Prevent cache state from leaking between test cases
73         os.Setenv("HOME", c.MkDir())
74 }
75
76 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
77         arv, err := arvadosclient.MakeArvadosClient()
78         c.Assert(err, IsNil)
79
80         kc, err := MakeKeepClient(arv)
81
82         c.Assert(err, IsNil)
83         c.Check(len(kc.LocalRoots()), Equals, 2)
84         for _, root := range kc.LocalRoots() {
85                 c.Check(root, Matches, "http://localhost:\\d+")
86         }
87 }
88
89 func (s *ServerRequiredSuite) TestDefaultStorageClasses(c *C) {
90         arv, err := arvadosclient.MakeArvadosClient()
91         c.Assert(err, IsNil)
92
93         cc, err := arv.ClusterConfig("StorageClasses")
94         c.Assert(err, IsNil)
95         c.Assert(cc, NotNil)
96         c.Assert(cc.(map[string]interface{})["default"], NotNil)
97
98         kc := New(arv)
99         c.Assert(kc.DefaultStorageClasses, DeepEquals, []string{"default"})
100 }
101
102 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
103         arv, err := arvadosclient.MakeArvadosClient()
104         c.Assert(err, IsNil)
105
106         kc, err := MakeKeepClient(arv)
107         c.Check(err, IsNil)
108         c.Assert(kc.Want_replicas, Equals, 2)
109
110         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
111         kc, err = MakeKeepClient(arv)
112         c.Check(err, IsNil)
113         c.Assert(kc.Want_replicas, Equals, 3)
114
115         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
116         kc, err = MakeKeepClient(arv)
117         c.Check(err, IsNil)
118         c.Assert(kc.Want_replicas, Equals, 1)
119 }
120
121 type StubPutHandler struct {
122         c                    *C
123         expectPath           string
124         expectAPIToken       string
125         expectBody           string
126         expectStorageClass   string
127         returnStorageClasses string
128         handled              chan string
129         requests             []*http.Request
130         mtx                  sync.Mutex
131 }
132
133 func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
134         sph.mtx.Lock()
135         sph.requests = append(sph.requests, req)
136         sph.mtx.Unlock()
137         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
138         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken))
139         if sph.expectStorageClass != "*" {
140                 sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
141         }
142         body, err := ioutil.ReadAll(req.Body)
143         sph.c.Check(err, IsNil)
144         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
145         resp.Header().Set("X-Keep-Replicas-Stored", "1")
146         if sph.returnStorageClasses != "" {
147                 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
148         }
149         resp.WriteHeader(200)
150         sph.handled <- fmt.Sprintf("http://%s", req.Host)
151 }
152
153 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
154         var err error
155         // If we don't explicitly bind it to localhost, ks.listener.Addr() will
156         // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
157         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
158         if err != nil {
159                 panic("Could not listen on any port")
160         }
161         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
162         go http.Serve(ks.listener, st)
163         return
164 }
165
166 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
167         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
168
169         ks := RunFakeKeepServer(st)
170         defer ks.listener.Close()
171
172         arv, _ := arvadosclient.MakeArvadosClient()
173         arv.ApiToken = "abc123"
174
175         kc, _ := MakeKeepClient(arv)
176
177         reader, writer := io.Pipe()
178         uploadStatusChan := make(chan uploadStatus)
179
180         f(kc, ks.url, reader, writer, uploadStatusChan)
181 }
182
183 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
184         log.Printf("TestUploadToStubKeepServer")
185
186         st := &StubPutHandler{
187                 c:                    c,
188                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
189                 expectAPIToken:       "abc123",
190                 expectBody:           "foo",
191                 expectStorageClass:   "",
192                 returnStorageClasses: "default=1",
193                 handled:              make(chan string),
194         }
195
196         UploadToStubHelper(c, st,
197                 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
198                         go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
199
200                         writer.Write([]byte("foo"))
201                         writer.Close()
202
203                         <-st.handled
204                         status := <-uploadStatusChan
205                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
206                 })
207 }
208
209 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
210         st := &StubPutHandler{
211                 c:                    c,
212                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
213                 expectAPIToken:       "abc123",
214                 expectBody:           "foo",
215                 expectStorageClass:   "",
216                 returnStorageClasses: "default=1",
217                 handled:              make(chan string),
218         }
219
220         UploadToStubHelper(c, st,
221                 func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
222                         go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
223
224                         <-st.handled
225
226                         status := <-uploadStatusChan
227                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
228                 })
229 }
230
231 func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
232         for _, trial := range []struct {
233                 respHeader string
234                 expectMap  map[string]int
235         }{
236                 {"", nil},
237                 {"foo=1", map[string]int{"foo": 1}},
238                 {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
239                 {" =foo=1 ", nil},
240                 {"foo", nil},
241         } {
242                 st := &StubPutHandler{
243                         c:                    c,
244                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
245                         expectAPIToken:       "abc123",
246                         expectBody:           "foo",
247                         expectStorageClass:   "",
248                         returnStorageClasses: trial.respHeader,
249                         handled:              make(chan string),
250                 }
251
252                 UploadToStubHelper(c, st,
253                         func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
254                                 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
255
256                                 writer.Write([]byte("foo"))
257                                 writer.Close()
258
259                                 <-st.handled
260                                 status := <-uploadStatusChan
261                                 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
262                         })
263         }
264 }
265
266 func (s *StandaloneSuite) TestPutWithoutStorageClassesClusterSupport(c *C) {
267         nServers := 5
268         for _, trial := range []struct {
269                 replicas      int
270                 clientClasses []string
271                 putClasses    []string
272                 minRequests   int
273                 maxRequests   int
274                 success       bool
275         }{
276                 // Talking to an older cluster (no default storage classes exported
277                 // config) and no other additional storage classes requirements.
278                 {1, nil, nil, 1, 1, true},
279                 {2, nil, nil, 2, 2, true},
280                 {3, nil, nil, 3, 3, true},
281                 {nServers*2 + 1, nil, nil, nServers, nServers, false},
282
283                 {1, []string{"class1"}, nil, 1, 1, true},
284                 {2, []string{"class1"}, nil, 2, 2, true},
285                 {3, []string{"class1"}, nil, 3, 3, true},
286                 {1, []string{"class1", "class2"}, nil, 1, 1, true},
287                 {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false},
288
289                 {1, nil, []string{"class1"}, 1, 1, true},
290                 {2, nil, []string{"class1"}, 2, 2, true},
291                 {3, nil, []string{"class1"}, 3, 3, true},
292                 {1, nil, []string{"class1", "class2"}, 1, 1, true},
293                 {nServers*2 + 1, nil, []string{"class1"}, nServers, nServers, false},
294         } {
295                 c.Logf("%+v", trial)
296                 st := &StubPutHandler{
297                         c:                    c,
298                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
299                         expectAPIToken:       "abc123",
300                         expectBody:           "foo",
301                         expectStorageClass:   "*",
302                         returnStorageClasses: "", // Simulate old cluster without SC keep support
303                         handled:              make(chan string, 100),
304                 }
305                 ks := RunSomeFakeKeepServers(st, nServers)
306                 arv, _ := arvadosclient.MakeArvadosClient()
307                 kc, _ := MakeKeepClient(arv)
308                 kc.Want_replicas = trial.replicas
309                 kc.StorageClasses = trial.clientClasses
310                 kc.DefaultStorageClasses = nil // Simulate an old cluster without SC defaults
311                 arv.ApiToken = "abc123"
312                 localRoots := make(map[string]string)
313                 writableLocalRoots := make(map[string]string)
314                 for i, k := range ks {
315                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
316                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
317                         defer k.listener.Close()
318                 }
319                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
320
321                 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
322                         Data:           []byte("foo"),
323                         StorageClasses: trial.putClasses,
324                 })
325                 if trial.success {
326                         c.Check(err, IsNil)
327                 } else {
328                         c.Check(err, NotNil)
329                 }
330                 c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
331                 c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
332                 if trial.clientClasses == nil && trial.putClasses == nil {
333                         c.Check(st.requests[0].Header.Get("X-Keep-Storage-Classes"), Equals, "")
334                 }
335         }
336 }
337
338 func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
339         nServers := 5
340         for _, trial := range []struct {
341                 replicas       int
342                 defaultClasses []string
343                 clientClasses  []string // clientClasses takes precedence over defaultClasses
344                 putClasses     []string // putClasses takes precedence over clientClasses
345                 minRequests    int
346                 maxRequests    int
347                 success        bool
348         }{
349                 {1, []string{"class1"}, nil, nil, 1, 1, true},
350                 {2, []string{"class1"}, nil, nil, 1, 2, true},
351                 {3, []string{"class1"}, nil, nil, 2, 3, true},
352                 {1, []string{"class1", "class2"}, nil, nil, 1, 1, true},
353
354                 // defaultClasses doesn't matter when any of the others is specified.
355                 {1, []string{"class1"}, []string{"class1"}, nil, 1, 1, true},
356                 {2, []string{"class1"}, []string{"class1"}, nil, 1, 2, true},
357                 {3, []string{"class1"}, []string{"class1"}, nil, 2, 3, true},
358                 {1, []string{"class1"}, []string{"class1", "class2"}, nil, 1, 1, true},
359                 {3, []string{"class1"}, nil, []string{"class1"}, 2, 3, true},
360                 {1, []string{"class1"}, nil, []string{"class1", "class2"}, 1, 1, true},
361                 {1, []string{"class1"}, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true},
362                 {1, []string{"class1"}, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false},
363                 {nServers*2 + 1, []string{}, []string{"class1"}, nil, nServers, nServers, false},
364                 {1, []string{"class1"}, []string{"class404"}, nil, nServers, nServers, false},
365                 {1, []string{"class1"}, []string{"class1", "class404"}, nil, nServers, nServers, false},
366                 {1, []string{"class1"}, nil, []string{"class1", "class404"}, nServers, nServers, false},
367         } {
368                 c.Logf("%+v", trial)
369                 st := &StubPutHandler{
370                         c:                    c,
371                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
372                         expectAPIToken:       "abc123",
373                         expectBody:           "foo",
374                         expectStorageClass:   "*",
375                         returnStorageClasses: "class1=2, class2=2",
376                         handled:              make(chan string, 100),
377                 }
378                 ks := RunSomeFakeKeepServers(st, nServers)
379                 arv, _ := arvadosclient.MakeArvadosClient()
380                 kc, _ := MakeKeepClient(arv)
381                 kc.Want_replicas = trial.replicas
382                 kc.StorageClasses = trial.clientClasses
383                 kc.DefaultStorageClasses = trial.defaultClasses
384                 arv.ApiToken = "abc123"
385                 localRoots := make(map[string]string)
386                 writableLocalRoots := make(map[string]string)
387                 for i, k := range ks {
388                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
389                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
390                         defer k.listener.Close()
391                 }
392                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
393
394                 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
395                         Data:           []byte("foo"),
396                         StorageClasses: trial.putClasses,
397                 })
398                 if trial.success {
399                         c.Check(err, IsNil)
400                 } else {
401                         c.Check(err, NotNil)
402                 }
403                 c.Check(len(st.handled) >= trial.minRequests, Equals, true, Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
404                 c.Check(len(st.handled) <= trial.maxRequests, Equals, true, Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
405                 if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, Equals, true) {
406                         // Max concurrency should be 1. First request
407                         // should have succeeded for class1. Second
408                         // request should only ask for class404.
409                         c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), Equals, "class404")
410                 }
411         }
412 }
413
414 type FailHandler struct {
415         handled chan string
416 }
417
418 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
419         resp.WriteHeader(500)
420         fh.handled <- fmt.Sprintf("http://%s", req.Host)
421 }
422
423 type FailThenSucceedHandler struct {
424         handled        chan string
425         count          int
426         successhandler http.Handler
427         reqIDs         []string
428 }
429
430 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
431         fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
432         if fh.count == 0 {
433                 resp.WriteHeader(500)
434                 fh.count++
435                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
436         } else {
437                 fh.successhandler.ServeHTTP(resp, req)
438         }
439 }
440
441 type Error404Handler struct {
442         handled chan string
443 }
444
445 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
446         resp.WriteHeader(404)
447         fh.handled <- fmt.Sprintf("http://%s", req.Host)
448 }
449
450 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
451         st := FailHandler{
452                 make(chan string)}
453
454         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
455
456         UploadToStubHelper(c, st,
457                 func(kc *KeepClient, url string, reader io.ReadCloser,
458                         writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
459
460                         go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
461
462                         writer.Write([]byte("foo"))
463                         writer.Close()
464
465                         <-st.handled
466
467                         status := <-uploadStatusChan
468                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
469                         c.Check(status.statusCode, Equals, 500)
470                 })
471 }
472
473 type KeepServer struct {
474         listener net.Listener
475         url      string
476 }
477
478 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
479         ks = make([]KeepServer, n)
480
481         for i := 0; i < n; i++ {
482                 ks[i] = RunFakeKeepServer(st)
483         }
484
485         return ks
486 }
487
488 func (s *StandaloneSuite) TestPutB(c *C) {
489         hash := Md5String("foo")
490
491         st := &StubPutHandler{
492                 c:                    c,
493                 expectPath:           hash,
494                 expectAPIToken:       "abc123",
495                 expectBody:           "foo",
496                 expectStorageClass:   "default",
497                 returnStorageClasses: "",
498                 handled:              make(chan string, 5),
499         }
500
501         arv, _ := arvadosclient.MakeArvadosClient()
502         kc, _ := MakeKeepClient(arv)
503
504         kc.Want_replicas = 2
505         arv.ApiToken = "abc123"
506         localRoots := make(map[string]string)
507         writableLocalRoots := make(map[string]string)
508
509         ks := RunSomeFakeKeepServers(st, 5)
510
511         for i, k := range ks {
512                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
513                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
514                 defer k.listener.Close()
515         }
516
517         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
518
519         kc.PutB([]byte("foo"))
520
521         shuff := NewRootSorter(
522                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
523
524         s1 := <-st.handled
525         s2 := <-st.handled
526         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
527                 (s1 == shuff[1] && s2 == shuff[0]),
528                 Equals,
529                 true)
530 }
531
532 func (s *StandaloneSuite) TestPutHR(c *C) {
533         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
534
535         st := &StubPutHandler{
536                 c:                    c,
537                 expectPath:           hash,
538                 expectAPIToken:       "abc123",
539                 expectBody:           "foo",
540                 expectStorageClass:   "default",
541                 returnStorageClasses: "",
542                 handled:              make(chan string, 5),
543         }
544
545         arv, _ := arvadosclient.MakeArvadosClient()
546         kc, _ := MakeKeepClient(arv)
547
548         kc.Want_replicas = 2
549         arv.ApiToken = "abc123"
550         localRoots := make(map[string]string)
551         writableLocalRoots := make(map[string]string)
552
553         ks := RunSomeFakeKeepServers(st, 5)
554
555         for i, k := range ks {
556                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
557                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
558                 defer k.listener.Close()
559         }
560
561         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
562
563         reader, writer := io.Pipe()
564
565         go func() {
566                 writer.Write([]byte("foo"))
567                 writer.Close()
568         }()
569
570         kc.PutHR(hash, reader, 3)
571
572         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
573
574         s1 := <-st.handled
575         s2 := <-st.handled
576
577         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
578                 (s1 == shuff[1] && s2 == shuff[0]),
579                 Equals,
580                 true)
581 }
582
583 func (s *StandaloneSuite) TestPutWithFail(c *C) {
584         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
585
586         st := &StubPutHandler{
587                 c:                    c,
588                 expectPath:           hash,
589                 expectAPIToken:       "abc123",
590                 expectBody:           "foo",
591                 expectStorageClass:   "default",
592                 returnStorageClasses: "",
593                 handled:              make(chan string, 4),
594         }
595
596         fh := FailHandler{
597                 make(chan string, 1)}
598
599         arv, err := arvadosclient.MakeArvadosClient()
600         c.Check(err, IsNil)
601         kc, _ := MakeKeepClient(arv)
602
603         kc.Want_replicas = 2
604         arv.ApiToken = "abc123"
605         localRoots := make(map[string]string)
606         writableLocalRoots := make(map[string]string)
607
608         ks1 := RunSomeFakeKeepServers(st, 4)
609         ks2 := RunSomeFakeKeepServers(fh, 1)
610
611         for i, k := range ks1 {
612                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
613                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
614                 defer k.listener.Close()
615         }
616         for i, k := range ks2 {
617                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
618                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
619                 defer k.listener.Close()
620         }
621
622         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
623
624         shuff := NewRootSorter(
625                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
626         c.Logf("%+v", shuff)
627
628         phash, replicas, err := kc.PutB([]byte("foo"))
629
630         <-fh.handled
631
632         c.Check(err, IsNil)
633         c.Check(phash, Equals, "")
634         c.Check(replicas, Equals, 2)
635
636         s1 := <-st.handled
637         s2 := <-st.handled
638
639         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
640                 (s1 == shuff[2] && s2 == shuff[1]),
641                 Equals,
642                 true)
643 }
644
645 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
646         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
647
648         st := &StubPutHandler{
649                 c:                    c,
650                 expectPath:           hash,
651                 expectAPIToken:       "abc123",
652                 expectBody:           "foo",
653                 expectStorageClass:   "default",
654                 returnStorageClasses: "",
655                 handled:              make(chan string, 1),
656         }
657
658         fh := FailHandler{
659                 make(chan string, 4)}
660
661         arv, err := arvadosclient.MakeArvadosClient()
662         c.Check(err, IsNil)
663         kc, _ := MakeKeepClient(arv)
664
665         kc.Want_replicas = 2
666         kc.Retries = 0
667         arv.ApiToken = "abc123"
668         localRoots := make(map[string]string)
669         writableLocalRoots := make(map[string]string)
670
671         ks1 := RunSomeFakeKeepServers(st, 1)
672         ks2 := RunSomeFakeKeepServers(fh, 4)
673
674         for i, k := range ks1 {
675                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
676                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
677                 defer k.listener.Close()
678         }
679         for i, k := range ks2 {
680                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
681                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
682                 defer k.listener.Close()
683         }
684
685         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
686
687         _, replicas, err := kc.PutB([]byte("foo"))
688
689         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
690         c.Check(replicas, Equals, 1)
691         c.Check(<-st.handled, Equals, ks1[0].url)
692 }
693
694 type StubGetHandler struct {
695         c              *C
696         expectPath     string
697         expectAPIToken string
698         httpStatus     int
699         body           []byte
700 }
701
702 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
703         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
704         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken))
705         resp.WriteHeader(sgh.httpStatus)
706         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
707         resp.Write(sgh.body)
708 }
709
710 func (s *StandaloneSuite) TestGet(c *C) {
711         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
712
713         st := StubGetHandler{
714                 c,
715                 hash,
716                 "abc123",
717                 http.StatusOK,
718                 []byte("foo")}
719
720         ks := RunFakeKeepServer(st)
721         defer ks.listener.Close()
722
723         arv, err := arvadosclient.MakeArvadosClient()
724         c.Check(err, IsNil)
725         kc, _ := MakeKeepClient(arv)
726         arv.ApiToken = "abc123"
727         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
728
729         r, n, _, err := kc.Get(hash)
730         c.Assert(err, IsNil)
731         c.Check(n, Equals, int64(3))
732
733         content, err2 := ioutil.ReadAll(r)
734         c.Check(err2, IsNil)
735         c.Check(content, DeepEquals, []byte("foo"))
736         c.Check(r.Close(), IsNil)
737 }
738
739 func (s *StandaloneSuite) TestGet404(c *C) {
740         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
741
742         st := Error404Handler{make(chan string, 1)}
743
744         ks := RunFakeKeepServer(st)
745         defer ks.listener.Close()
746
747         arv, err := arvadosclient.MakeArvadosClient()
748         c.Check(err, IsNil)
749         kc, _ := MakeKeepClient(arv)
750         arv.ApiToken = "abc123"
751         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
752
753         r, n, _, err := kc.Get(hash)
754         c.Check(err, Equals, BlockNotFound)
755         c.Check(n, Equals, int64(0))
756         c.Check(r, IsNil)
757 }
758
759 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
760         st := Error404Handler{make(chan string, 1)}
761
762         ks := RunFakeKeepServer(st)
763         defer ks.listener.Close()
764
765         arv, err := arvadosclient.MakeArvadosClient()
766         c.Check(err, IsNil)
767         kc, _ := MakeKeepClient(arv)
768         arv.ApiToken = "abc123"
769         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
770
771         r, n, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
772         c.Check(err, IsNil)
773         c.Check(n, Equals, int64(0))
774         c.Assert(r, NotNil)
775         buf, err := ioutil.ReadAll(r)
776         c.Check(err, IsNil)
777         c.Check(buf, DeepEquals, []byte{})
778         c.Check(r.Close(), IsNil)
779 }
780
781 func (s *StandaloneSuite) TestGetFail(c *C) {
782         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
783
784         st := FailHandler{make(chan string, 1)}
785
786         ks := RunFakeKeepServer(st)
787         defer ks.listener.Close()
788
789         arv, err := arvadosclient.MakeArvadosClient()
790         c.Check(err, IsNil)
791         kc, _ := MakeKeepClient(arv)
792         arv.ApiToken = "abc123"
793         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
794         kc.Retries = 0
795
796         r, n, _, err := kc.Get(hash)
797         errNotFound, _ := err.(*ErrNotFound)
798         if c.Check(errNotFound, NotNil) {
799                 c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
800                 c.Check(errNotFound.Temporary(), Equals, true)
801         }
802         c.Check(n, Equals, int64(0))
803         c.Check(r, IsNil)
804 }
805
806 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
807         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
808
809         st := &FailThenSucceedHandler{
810                 handled: make(chan string, 1),
811                 successhandler: StubGetHandler{
812                         c,
813                         hash,
814                         "abc123",
815                         http.StatusOK,
816                         []byte("foo")}}
817
818         ks := RunFakeKeepServer(st)
819         defer ks.listener.Close()
820
821         arv, err := arvadosclient.MakeArvadosClient()
822         c.Check(err, IsNil)
823         kc, _ := MakeKeepClient(arv)
824         arv.ApiToken = "abc123"
825         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
826
827         r, n, _, err := kc.Get(hash)
828         c.Assert(err, IsNil)
829         c.Check(n, Equals, int64(3))
830
831         content, err := ioutil.ReadAll(r)
832         c.Check(err, IsNil)
833         c.Check(content, DeepEquals, []byte("foo"))
834         c.Check(r.Close(), IsNil)
835
836         c.Logf("%q", st.reqIDs)
837         c.Assert(len(st.reqIDs) > 1, Equals, true)
838         for _, reqid := range st.reqIDs {
839                 c.Check(reqid, Not(Equals), "")
840                 c.Check(reqid, Equals, st.reqIDs[0])
841         }
842 }
843
844 func (s *StandaloneSuite) TestGetNetError(c *C) {
845         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
846
847         arv, err := arvadosclient.MakeArvadosClient()
848         c.Check(err, IsNil)
849         kc, _ := MakeKeepClient(arv)
850         arv.ApiToken = "abc123"
851         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
852
853         r, n, _, err := kc.Get(hash)
854         errNotFound, _ := err.(*ErrNotFound)
855         if c.Check(errNotFound, NotNil) {
856                 c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
857                 c.Check(errNotFound.Temporary(), Equals, true)
858         }
859         c.Check(n, Equals, int64(0))
860         c.Check(r, IsNil)
861 }
862
863 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
864         uuid := "zzzzz-bi6l4-123451234512345"
865         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
866
867         // This one shouldn't be used:
868         ks0 := RunFakeKeepServer(StubGetHandler{
869                 c,
870                 "error if used",
871                 "abc123",
872                 http.StatusOK,
873                 []byte("foo")})
874         defer ks0.listener.Close()
875         // This one should be used:
876         ks := RunFakeKeepServer(StubGetHandler{
877                 c,
878                 hash + "+K@" + uuid,
879                 "abc123",
880                 http.StatusOK,
881                 []byte("foo")})
882         defer ks.listener.Close()
883
884         arv, err := arvadosclient.MakeArvadosClient()
885         c.Check(err, IsNil)
886         kc, _ := MakeKeepClient(arv)
887         arv.ApiToken = "abc123"
888         kc.SetServiceRoots(
889                 map[string]string{"x": ks0.url},
890                 nil,
891                 map[string]string{uuid: ks.url})
892
893         r, n, _, err := kc.Get(hash + "+K@" + uuid)
894         c.Assert(err, IsNil)
895         c.Check(n, Equals, int64(3))
896
897         content, err := ioutil.ReadAll(r)
898         c.Check(err, IsNil)
899         c.Check(content, DeepEquals, []byte("foo"))
900         c.Check(r.Close(), IsNil)
901 }
902
903 // Use a service hint to fetch from a local disk service, overriding
904 // rendezvous probe order.
905 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
906         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
907         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
908
909         // This one shouldn't be used, although it appears first in
910         // rendezvous probe order:
911         ks0 := RunFakeKeepServer(StubGetHandler{
912                 c,
913                 "error if used",
914                 "abc123",
915                 http.StatusBadGateway,
916                 nil})
917         defer ks0.listener.Close()
918         // This one should be used:
919         ks := RunFakeKeepServer(StubGetHandler{
920                 c,
921                 hash + "+K@" + uuid,
922                 "abc123",
923                 http.StatusOK,
924                 []byte("foo")})
925         defer ks.listener.Close()
926
927         arv, err := arvadosclient.MakeArvadosClient()
928         c.Check(err, IsNil)
929         kc, _ := MakeKeepClient(arv)
930         arv.ApiToken = "abc123"
931         kc.SetServiceRoots(
932                 map[string]string{
933                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
934                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
935                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
936                         uuid:                          ks.url},
937                 nil,
938                 map[string]string{
939                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
940                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
941                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
942                         uuid:                          ks.url},
943         )
944
945         r, n, _, err := kc.Get(hash + "+K@" + uuid)
946         c.Assert(err, IsNil)
947         c.Check(n, Equals, int64(3))
948
949         content, err := ioutil.ReadAll(r)
950         c.Check(err, IsNil)
951         c.Check(content, DeepEquals, []byte("foo"))
952         c.Check(r.Close(), IsNil)
953 }
954
955 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
956         uuid := "zzzzz-bi6l4-123451234512345"
957         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
958
959         ksLocal := RunFakeKeepServer(StubGetHandler{
960                 c,
961                 hash + "+K@" + uuid,
962                 "abc123",
963                 http.StatusOK,
964                 []byte("foo")})
965         defer ksLocal.listener.Close()
966         ksGateway := RunFakeKeepServer(StubGetHandler{
967                 c,
968                 hash + "+K@" + uuid,
969                 "abc123",
970                 http.StatusInternalServerError,
971                 []byte("Error")})
972         defer ksGateway.listener.Close()
973
974         arv, err := arvadosclient.MakeArvadosClient()
975         c.Check(err, IsNil)
976         kc, _ := MakeKeepClient(arv)
977         arv.ApiToken = "abc123"
978         kc.SetServiceRoots(
979                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
980                 nil,
981                 map[string]string{uuid: ksGateway.url})
982
983         r, n, _, err := kc.Get(hash + "+K@" + uuid)
984         c.Assert(err, IsNil)
985         c.Check(n, Equals, int64(3))
986
987         content, err := ioutil.ReadAll(r)
988         c.Check(err, IsNil)
989         c.Check(content, DeepEquals, []byte("foo"))
990         c.Check(r.Close(), IsNil)
991 }
992
993 type BarHandler struct {
994         handled chan string
995 }
996
997 func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
998         resp.Write([]byte("bar"))
999         h.handled <- fmt.Sprintf("http://%s", req.Host)
1000 }
1001
1002 func (s *StandaloneSuite) TestChecksum(c *C) {
1003         foohash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1004         barhash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
1005
1006         st := BarHandler{make(chan string, 1)}
1007
1008         ks := RunFakeKeepServer(st)
1009         defer ks.listener.Close()
1010
1011         arv, err := arvadosclient.MakeArvadosClient()
1012         c.Check(err, IsNil)
1013         kc, _ := MakeKeepClient(arv)
1014         arv.ApiToken = "abc123"
1015         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1016
1017         r, n, _, err := kc.Get(barhash)
1018         if c.Check(err, IsNil) {
1019                 _, err = ioutil.ReadAll(r)
1020                 c.Check(n, Equals, int64(3))
1021                 c.Check(err, IsNil)
1022         }
1023
1024         select {
1025         case <-st.handled:
1026         case <-time.After(time.Second):
1027                 c.Fatal("timed out")
1028         }
1029
1030         r, n, _, err = kc.Get(foohash)
1031         if err == nil {
1032                 buf, readerr := ioutil.ReadAll(r)
1033                 c.Logf("%q", buf)
1034                 err = readerr
1035         }
1036         c.Check(err, Equals, BadChecksum)
1037
1038         select {
1039         case <-st.handled:
1040         case <-time.After(time.Second):
1041                 c.Fatal("timed out")
1042         }
1043 }
1044
1045 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
1046         content := []byte("waz")
1047         hash := fmt.Sprintf("%x+3", md5.Sum(content))
1048
1049         fh := Error404Handler{
1050                 make(chan string, 4)}
1051
1052         st := StubGetHandler{
1053                 c,
1054                 hash,
1055                 "abc123",
1056                 http.StatusOK,
1057                 content}
1058
1059         arv, err := arvadosclient.MakeArvadosClient()
1060         c.Check(err, IsNil)
1061         kc, _ := MakeKeepClient(arv)
1062         arv.ApiToken = "abc123"
1063         localRoots := make(map[string]string)
1064         writableLocalRoots := make(map[string]string)
1065
1066         ks1 := RunSomeFakeKeepServers(st, 1)
1067         ks2 := RunSomeFakeKeepServers(fh, 4)
1068
1069         for i, k := range ks1 {
1070                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1071                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1072                 defer k.listener.Close()
1073         }
1074         for i, k := range ks2 {
1075                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
1076                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
1077                 defer k.listener.Close()
1078         }
1079
1080         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1081         kc.Retries = 0
1082
1083         // This test works only if one of the failing services is
1084         // attempted before the succeeding service. Otherwise,
1085         // <-fh.handled below will just hang! (Probe order depends on
1086         // the choice of block content "waz" and the UUIDs of the fake
1087         // servers, so we just tried different strings until we found
1088         // an example that passes this Assert.)
1089         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
1090
1091         r, n, _, err := kc.Get(hash)
1092
1093         select {
1094         case <-fh.handled:
1095         case <-time.After(time.Second):
1096                 c.Fatal("timed out")
1097         }
1098         c.Assert(err, IsNil)
1099         c.Check(n, Equals, int64(3))
1100
1101         readContent, err2 := ioutil.ReadAll(r)
1102         c.Check(err2, IsNil)
1103         c.Check(readContent, DeepEquals, content)
1104         c.Check(r.Close(), IsNil)
1105 }
1106
1107 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
1108         content := []byte("TestPutGetHead")
1109
1110         arv, err := arvadosclient.MakeArvadosClient()
1111         c.Check(err, IsNil)
1112         kc, err := MakeKeepClient(arv)
1113         c.Assert(err, IsNil)
1114
1115         hash := fmt.Sprintf("%x+%d", md5.Sum(content), len(content))
1116
1117         {
1118                 n, _, err := kc.Ask(hash)
1119                 c.Check(err, Equals, BlockNotFound)
1120                 c.Check(n, Equals, int64(0))
1121         }
1122         {
1123                 hash2, replicas, err := kc.PutB(content)
1124                 c.Check(err, IsNil)
1125                 c.Check(hash2, Matches, `\Q`+hash+`\E\b.*`)
1126                 c.Check(replicas, Equals, 2)
1127         }
1128         {
1129                 r, n, _, err := kc.Get(hash)
1130                 c.Check(err, IsNil)
1131                 c.Check(n, Equals, int64(len(content)))
1132                 if c.Check(r, NotNil) {
1133                         readContent, err := ioutil.ReadAll(r)
1134                         c.Check(err, IsNil)
1135                         if c.Check(len(readContent), Equals, len(content)) {
1136                                 c.Check(readContent, DeepEquals, content)
1137                         }
1138                         c.Check(r.Close(), IsNil)
1139                 }
1140         }
1141         {
1142                 n, url2, err := kc.Ask(hash)
1143                 c.Check(err, IsNil)
1144                 c.Check(n, Equals, int64(len(content)))
1145                 c.Check(url2, Matches, "http://localhost:\\d+/\\Q"+hash+"\\E")
1146         }
1147         {
1148                 loc, err := kc.LocalLocator(hash)
1149                 c.Check(err, IsNil)
1150                 c.Assert(len(loc) >= 32, Equals, true)
1151                 c.Check(loc[:32], Equals, hash[:32])
1152         }
1153         {
1154                 content := []byte("the perth county conspiracy")
1155                 loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
1156                 c.Check(loc, Equals, "")
1157                 c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
1158                 c.Check(err, ErrorMatches, `.*HTTP 400.*`)
1159         }
1160 }
1161
1162 type StubProxyHandler struct {
1163         handled chan string
1164 }
1165
1166 func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1167         resp.Header().Set("X-Keep-Replicas-Stored", "2")
1168         h.handled <- fmt.Sprintf("http://%s", req.Host)
1169 }
1170
1171 func (s *StandaloneSuite) TestPutProxy(c *C) {
1172         st := StubProxyHandler{make(chan string, 1)}
1173
1174         arv, err := arvadosclient.MakeArvadosClient()
1175         c.Check(err, IsNil)
1176         kc, _ := MakeKeepClient(arv)
1177
1178         kc.Want_replicas = 2
1179         arv.ApiToken = "abc123"
1180         localRoots := make(map[string]string)
1181         writableLocalRoots := make(map[string]string)
1182
1183         ks1 := RunSomeFakeKeepServers(st, 1)
1184
1185         for i, k := range ks1 {
1186                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1187                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1188                 defer k.listener.Close()
1189         }
1190
1191         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1192
1193         _, replicas, err := kc.PutB([]byte("foo"))
1194         <-st.handled
1195
1196         c.Check(err, IsNil)
1197         c.Check(replicas, Equals, 2)
1198 }
1199
1200 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
1201         st := StubProxyHandler{make(chan string, 1)}
1202
1203         arv, err := arvadosclient.MakeArvadosClient()
1204         c.Check(err, IsNil)
1205         kc, _ := MakeKeepClient(arv)
1206
1207         kc.Want_replicas = 3
1208         arv.ApiToken = "abc123"
1209         localRoots := make(map[string]string)
1210         writableLocalRoots := make(map[string]string)
1211
1212         ks1 := RunSomeFakeKeepServers(st, 1)
1213
1214         for i, k := range ks1 {
1215                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1216                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1217                 defer k.listener.Close()
1218         }
1219         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1220
1221         _, replicas, err := kc.PutB([]byte("foo"))
1222         <-st.handled
1223
1224         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1225         c.Check(replicas, Equals, 2)
1226 }
1227
1228 func (s *StandaloneSuite) TestMakeLocator(c *C) {
1229         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
1230         c.Check(err, IsNil)
1231         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1232         c.Check(l.Size, Equals, 3)
1233         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
1234 }
1235
1236 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
1237         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
1238         c.Check(err, IsNil)
1239         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1240         c.Check(l.Size, Equals, -1)
1241         c.Check(l.Hints, DeepEquals, []string{})
1242 }
1243
1244 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
1245         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
1246         c.Check(err, IsNil)
1247         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1248         c.Check(l.Size, Equals, -1)
1249         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
1250 }
1251
1252 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
1253         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
1254         l, err := MakeLocator(str)
1255         c.Check(err, IsNil)
1256         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1257         c.Check(l.Size, Equals, 3)
1258         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
1259         c.Check(l.String(), Equals, str)
1260 }
1261
1262 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
1263         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
1264         c.Check(err, Equals, InvalidLocatorError)
1265 }
1266
1267 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
1268         hash := Md5String("foo")
1269
1270         st := &StubPutHandler{
1271                 c:                    c,
1272                 expectPath:           hash,
1273                 expectAPIToken:       "abc123",
1274                 expectBody:           "foo",
1275                 expectStorageClass:   "default",
1276                 returnStorageClasses: "",
1277                 handled:              make(chan string, 5),
1278         }
1279
1280         arv, _ := arvadosclient.MakeArvadosClient()
1281         kc, _ := MakeKeepClient(arv)
1282
1283         kc.Want_replicas = 2
1284         arv.ApiToken = "abc123"
1285         localRoots := make(map[string]string)
1286         writableLocalRoots := make(map[string]string)
1287
1288         ks := RunSomeFakeKeepServers(st, 5)
1289
1290         for i, k := range ks {
1291                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1292                 if i == 0 {
1293                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1294                 }
1295                 defer k.listener.Close()
1296         }
1297
1298         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1299
1300         _, replicas, err := kc.PutB([]byte("foo"))
1301
1302         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1303         c.Check(replicas, Equals, 1)
1304
1305         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1306 }
1307
1308 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1309         hash := Md5String("foo")
1310
1311         st := &StubPutHandler{
1312                 c:                    c,
1313                 expectPath:           hash,
1314                 expectAPIToken:       "abc123",
1315                 expectBody:           "foo",
1316                 expectStorageClass:   "",
1317                 returnStorageClasses: "",
1318                 handled:              make(chan string, 5),
1319         }
1320
1321         arv, _ := arvadosclient.MakeArvadosClient()
1322         kc, _ := MakeKeepClient(arv)
1323
1324         kc.Want_replicas = 2
1325         arv.ApiToken = "abc123"
1326         localRoots := make(map[string]string)
1327         writableLocalRoots := make(map[string]string)
1328
1329         ks := RunSomeFakeKeepServers(st, 5)
1330
1331         for i, k := range ks {
1332                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1333                 defer k.listener.Close()
1334         }
1335
1336         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1337
1338         _, replicas, err := kc.PutB([]byte("foo"))
1339
1340         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1341         c.Check(replicas, Equals, 0)
1342 }
1343
1344 type StubGetIndexHandler struct {
1345         c              *C
1346         expectPath     string
1347         expectAPIToken string
1348         httpStatus     int
1349         body           []byte
1350 }
1351
1352 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1353         h.c.Check(req.URL.Path, Equals, h.expectPath)
1354         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1355         resp.WriteHeader(h.httpStatus)
1356         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1357         resp.Write(h.body)
1358 }
1359
1360 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1361         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1362
1363         st := StubGetIndexHandler{
1364                 c,
1365                 "/index",
1366                 "abc123",
1367                 http.StatusOK,
1368                 []byte(hash + " 1443559274\n\n")}
1369
1370         ks := RunFakeKeepServer(st)
1371         defer ks.listener.Close()
1372
1373         arv, err := arvadosclient.MakeArvadosClient()
1374         c.Assert(err, IsNil)
1375         kc, err := MakeKeepClient(arv)
1376         c.Assert(err, IsNil)
1377         arv.ApiToken = "abc123"
1378         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1379
1380         r, err := kc.GetIndex("x", "")
1381         c.Check(err, IsNil)
1382
1383         content, err2 := ioutil.ReadAll(r)
1384         c.Check(err2, IsNil)
1385         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1386 }
1387
1388 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1389         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1390
1391         st := StubGetIndexHandler{
1392                 c,
1393                 "/index/" + hash[0:3],
1394                 "abc123",
1395                 http.StatusOK,
1396                 []byte(hash + " 1443559274\n\n")}
1397
1398         ks := RunFakeKeepServer(st)
1399         defer ks.listener.Close()
1400
1401         arv, err := arvadosclient.MakeArvadosClient()
1402         c.Check(err, IsNil)
1403         kc, _ := MakeKeepClient(arv)
1404         arv.ApiToken = "abc123"
1405         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1406
1407         r, err := kc.GetIndex("x", hash[0:3])
1408         c.Assert(err, IsNil)
1409
1410         content, err2 := ioutil.ReadAll(r)
1411         c.Check(err2, IsNil)
1412         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1413 }
1414
1415 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1416         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1417
1418         st := StubGetIndexHandler{
1419                 c,
1420                 "/index/" + hash[0:3],
1421                 "abc123",
1422                 http.StatusOK,
1423                 []byte(hash)}
1424
1425         ks := RunFakeKeepServer(st)
1426         defer ks.listener.Close()
1427
1428         arv, err := arvadosclient.MakeArvadosClient()
1429         c.Check(err, IsNil)
1430         kc, _ := MakeKeepClient(arv)
1431         arv.ApiToken = "abc123"
1432         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1433
1434         _, err = kc.GetIndex("x", hash[0:3])
1435         c.Check(err, Equals, ErrIncompleteIndex)
1436 }
1437
1438 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1439         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
1440
1441         st := StubGetIndexHandler{
1442                 c,
1443                 "/index/" + hash[0:3],
1444                 "abc123",
1445                 http.StatusOK,
1446                 []byte(hash)}
1447
1448         ks := RunFakeKeepServer(st)
1449         defer ks.listener.Close()
1450
1451         arv, err := arvadosclient.MakeArvadosClient()
1452         c.Check(err, IsNil)
1453         kc, _ := MakeKeepClient(arv)
1454         arv.ApiToken = "abc123"
1455         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1456
1457         _, err = kc.GetIndex("y", hash[0:3])
1458         c.Check(err, Equals, ErrNoSuchKeepServer)
1459 }
1460
1461 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1462         st := StubGetIndexHandler{
1463                 c,
1464                 "/index/abcd",
1465                 "abc123",
1466                 http.StatusOK,
1467                 []byte("\n")}
1468
1469         ks := RunFakeKeepServer(st)
1470         defer ks.listener.Close()
1471
1472         arv, err := arvadosclient.MakeArvadosClient()
1473         c.Check(err, IsNil)
1474         kc, _ := MakeKeepClient(arv)
1475         arv.ApiToken = "abc123"
1476         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1477
1478         r, err := kc.GetIndex("x", "abcd")
1479         c.Check(err, IsNil)
1480
1481         content, err2 := ioutil.ReadAll(r)
1482         c.Check(err2, IsNil)
1483         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1484 }
1485
1486 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1487         st := &FailThenSucceedHandler{
1488                 handled: make(chan string, 1),
1489                 successhandler: &StubPutHandler{
1490                         c:                    c,
1491                         expectPath:           Md5String("foo"),
1492                         expectAPIToken:       "abc123",
1493                         expectBody:           "foo",
1494                         expectStorageClass:   "default",
1495                         returnStorageClasses: "",
1496                         handled:              make(chan string, 5),
1497                 },
1498         }
1499
1500         arv, _ := arvadosclient.MakeArvadosClient()
1501         kc, _ := MakeKeepClient(arv)
1502
1503         kc.Want_replicas = 2
1504         arv.ApiToken = "abc123"
1505         localRoots := make(map[string]string)
1506         writableLocalRoots := make(map[string]string)
1507
1508         ks := RunSomeFakeKeepServers(st, 2)
1509
1510         for i, k := range ks {
1511                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1512                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1513                 defer k.listener.Close()
1514         }
1515
1516         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1517
1518         hash, replicas, err := kc.PutB([]byte("foo"))
1519
1520         c.Check(err, IsNil)
1521         c.Check(hash, Equals, "")
1522         c.Check(replicas, Equals, 2)
1523 }
1524
1525 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1526         arv, err := arvadosclient.MakeArvadosClient()
1527         c.Assert(err, IsNil)
1528
1529         // Add an additional "testblobstore" keepservice
1530         blobKeepService := make(arvadosclient.Dict)
1531         err = arv.Create("keep_services",
1532                 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1533                         "service_host": "localhost",
1534                         "service_port": "21321",
1535                         "service_type": "testblobstore"}},
1536                 &blobKeepService)
1537         c.Assert(err, IsNil)
1538         defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1539         RefreshServiceDiscovery()
1540
1541         // Make a keepclient and ensure that the testblobstore is included
1542         kc, err := MakeKeepClient(arv)
1543         c.Assert(err, IsNil)
1544
1545         // verify kc.LocalRoots
1546         c.Check(len(kc.LocalRoots()), Equals, 3)
1547         for _, root := range kc.LocalRoots() {
1548                 c.Check(root, Matches, "http://localhost:\\d+")
1549         }
1550         c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1551
1552         // verify kc.GatewayRoots
1553         c.Check(len(kc.GatewayRoots()), Equals, 3)
1554         for _, root := range kc.GatewayRoots() {
1555                 c.Check(root, Matches, "http://localhost:\\d+")
1556         }
1557         c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1558
1559         // verify kc.WritableLocalRoots
1560         c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1561         for _, root := range kc.WritableLocalRoots() {
1562                 c.Check(root, Matches, "http://localhost:\\d+")
1563         }
1564         c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1565
1566         c.Assert(kc.replicasPerService, Equals, 0)
1567         c.Assert(kc.foundNonDiskSvc, Equals, true)
1568         c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
1569 }