Merge branch '17204-not-modified'
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "net"
16         "net/http"
17         "os"
18         "strings"
19         "sync"
20         "testing"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
24         "git.arvados.org/arvados.git/sdk/go/arvadostest"
25         . "gopkg.in/check.v1"
26         check "gopkg.in/check.v1"
27 )
28
29 // Gocheck boilerplate
30 func Test(t *testing.T) {
31         TestingT(t)
32 }
33
34 // Gocheck boilerplate
35 var _ = Suite(&ServerRequiredSuite{})
36 var _ = Suite(&StandaloneSuite{})
37
38 // Tests that require the Keep server running
39 type ServerRequiredSuite struct{}
40
41 // Standalone tests
42 type StandaloneSuite struct{}
43
44 func (s *StandaloneSuite) SetUpTest(c *C) {
45         RefreshServiceDiscovery()
46 }
47
48 func pythonDir() string {
49         cwd, _ := os.Getwd()
50         return fmt.Sprintf("%s/../../python/tests", cwd)
51 }
52
53 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
54         arvadostest.StartAPI()
55         arvadostest.StartKeep(2, false)
56 }
57
58 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
59         arvadostest.StopKeep(2)
60         arvadostest.StopAPI()
61 }
62
63 func (s *ServerRequiredSuite) SetUpTest(c *C) {
64         RefreshServiceDiscovery()
65 }
66
67 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
68         arv, err := arvadosclient.MakeArvadosClient()
69         c.Assert(err, Equals, nil)
70
71         kc, err := MakeKeepClient(arv)
72
73         c.Assert(err, Equals, nil)
74         c.Check(len(kc.LocalRoots()), Equals, 2)
75         for _, root := range kc.LocalRoots() {
76                 c.Check(root, Matches, "http://localhost:\\d+")
77         }
78 }
79
80 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
81         arv, err := arvadosclient.MakeArvadosClient()
82         c.Assert(err, Equals, nil)
83
84         kc, err := MakeKeepClient(arv)
85         c.Check(err, IsNil)
86         c.Assert(kc.Want_replicas, Equals, 2)
87
88         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
89         kc, err = MakeKeepClient(arv)
90         c.Check(err, IsNil)
91         c.Assert(kc.Want_replicas, Equals, 3)
92
93         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
94         kc, err = MakeKeepClient(arv)
95         c.Check(err, IsNil)
96         c.Assert(kc.Want_replicas, Equals, 1)
97 }
98
99 type StubPutHandler struct {
100         c                    *C
101         expectPath           string
102         expectAPIToken       string
103         expectBody           string
104         expectStorageClass   string
105         returnStorageClasses string
106         handled              chan string
107         requests             []*http.Request
108         mtx                  sync.Mutex
109 }
110
111 func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
112         sph.mtx.Lock()
113         sph.requests = append(sph.requests, req)
114         sph.mtx.Unlock()
115         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
116         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken))
117         if sph.expectStorageClass != "*" {
118                 sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
119         }
120         body, err := ioutil.ReadAll(req.Body)
121         sph.c.Check(err, Equals, nil)
122         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
123         resp.Header().Set("X-Keep-Replicas-Stored", "1")
124         if sph.returnStorageClasses != "" {
125                 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
126         }
127         resp.WriteHeader(200)
128         sph.handled <- fmt.Sprintf("http://%s", req.Host)
129 }
130
131 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
132         var err error
133         // If we don't explicitly bind it to localhost, ks.listener.Addr() will
134         // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
135         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
136         if err != nil {
137                 panic(fmt.Sprintf("Could not listen on any port"))
138         }
139         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
140         go http.Serve(ks.listener, st)
141         return
142 }
143
144 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
145         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
146
147         ks := RunFakeKeepServer(st)
148         defer ks.listener.Close()
149
150         arv, _ := arvadosclient.MakeArvadosClient()
151         arv.ApiToken = "abc123"
152
153         kc, _ := MakeKeepClient(arv)
154
155         reader, writer := io.Pipe()
156         uploadStatusChan := make(chan uploadStatus)
157
158         f(kc, ks.url, reader, writer, uploadStatusChan)
159 }
160
161 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
162         log.Printf("TestUploadToStubKeepServer")
163
164         st := &StubPutHandler{
165                 c:                    c,
166                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
167                 expectAPIToken:       "abc123",
168                 expectBody:           "foo",
169                 expectStorageClass:   "",
170                 returnStorageClasses: "default=1",
171                 handled:              make(chan string),
172         }
173
174         UploadToStubHelper(c, st,
175                 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
176                         go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
177
178                         writer.Write([]byte("foo"))
179                         writer.Close()
180
181                         <-st.handled
182                         status := <-uploadStatusChan
183                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
184                 })
185 }
186
187 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
188         st := &StubPutHandler{
189                 c:                    c,
190                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
191                 expectAPIToken:       "abc123",
192                 expectBody:           "foo",
193                 expectStorageClass:   "",
194                 returnStorageClasses: "default=1",
195                 handled:              make(chan string),
196         }
197
198         UploadToStubHelper(c, st,
199                 func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
200                         go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
201
202                         <-st.handled
203
204                         status := <-uploadStatusChan
205                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
206                 })
207 }
208
209 func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
210         for _, trial := range []struct {
211                 respHeader string
212                 expectMap  map[string]int
213         }{
214                 {"", nil},
215                 {"foo=1", map[string]int{"foo": 1}},
216                 {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
217                 {" =foo=1 ", nil},
218                 {"foo", nil},
219         } {
220                 st := &StubPutHandler{
221                         c:                    c,
222                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
223                         expectAPIToken:       "abc123",
224                         expectBody:           "foo",
225                         expectStorageClass:   "",
226                         returnStorageClasses: trial.respHeader,
227                         handled:              make(chan string),
228                 }
229
230                 UploadToStubHelper(c, st,
231                         func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
232                                 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
233
234                                 writer.Write([]byte("foo"))
235                                 writer.Close()
236
237                                 <-st.handled
238                                 status := <-uploadStatusChan
239                                 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
240                         })
241         }
242 }
243
244 func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
245         nServers := 5
246         for _, trial := range []struct {
247                 replicas    int
248                 classes     []string
249                 minRequests int
250                 maxRequests int
251                 success     bool
252         }{
253                 {1, []string{"class1"}, 1, 1, true},
254                 {2, []string{"class1"}, 1, 2, true},
255                 {3, []string{"class1"}, 2, 3, true},
256                 {1, []string{"class1", "class2"}, 1, 1, true},
257                 {nServers*2 + 1, []string{"class1"}, nServers, nServers, false},
258                 {1, []string{"class404"}, nServers, nServers, false},
259                 {1, []string{"class1", "class404"}, nServers, nServers, false},
260         } {
261                 c.Logf("%+v", trial)
262                 st := &StubPutHandler{
263                         c:                    c,
264                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
265                         expectAPIToken:       "abc123",
266                         expectBody:           "foo",
267                         expectStorageClass:   "*",
268                         returnStorageClasses: "class1=2, class2=2",
269                         handled:              make(chan string, 100),
270                 }
271                 ks := RunSomeFakeKeepServers(st, nServers)
272                 arv, _ := arvadosclient.MakeArvadosClient()
273                 kc, _ := MakeKeepClient(arv)
274                 kc.Want_replicas = trial.replicas
275                 kc.StorageClasses = trial.classes
276                 arv.ApiToken = "abc123"
277                 localRoots := make(map[string]string)
278                 writableLocalRoots := make(map[string]string)
279                 for i, k := range ks {
280                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
281                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
282                         defer k.listener.Close()
283                 }
284                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
285
286                 _, _, err := kc.PutB([]byte("foo"))
287                 if trial.success {
288                         c.Check(err, check.IsNil)
289                 } else {
290                         c.Check(err, check.NotNil)
291                 }
292                 c.Check(len(st.handled) >= trial.minRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
293                 c.Check(len(st.handled) <= trial.maxRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
294                 if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, check.Equals, true) {
295                         // Max concurrency should be 1. First request
296                         // should have succeeded for class1. Second
297                         // request should only ask for class404.
298                         c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), check.Equals, "class404")
299                 }
300         }
301 }
302
303 type FailHandler struct {
304         handled chan string
305 }
306
307 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
308         resp.WriteHeader(500)
309         fh.handled <- fmt.Sprintf("http://%s", req.Host)
310 }
311
312 type FailThenSucceedHandler struct {
313         handled        chan string
314         count          int
315         successhandler http.Handler
316         reqIDs         []string
317 }
318
319 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
320         fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
321         if fh.count == 0 {
322                 resp.WriteHeader(500)
323                 fh.count++
324                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
325         } else {
326                 fh.successhandler.ServeHTTP(resp, req)
327         }
328 }
329
330 type Error404Handler struct {
331         handled chan string
332 }
333
334 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
335         resp.WriteHeader(404)
336         fh.handled <- fmt.Sprintf("http://%s", req.Host)
337 }
338
339 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
340         st := FailHandler{
341                 make(chan string)}
342
343         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
344
345         UploadToStubHelper(c, st,
346                 func(kc *KeepClient, url string, reader io.ReadCloser,
347                         writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
348
349                         go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
350
351                         writer.Write([]byte("foo"))
352                         writer.Close()
353
354                         <-st.handled
355
356                         status := <-uploadStatusChan
357                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
358                         c.Check(status.statusCode, Equals, 500)
359                 })
360 }
361
362 type KeepServer struct {
363         listener net.Listener
364         url      string
365 }
366
367 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
368         ks = make([]KeepServer, n)
369
370         for i := 0; i < n; i++ {
371                 ks[i] = RunFakeKeepServer(st)
372         }
373
374         return ks
375 }
376
377 func (s *StandaloneSuite) TestPutB(c *C) {
378         hash := Md5String("foo")
379
380         st := &StubPutHandler{
381                 c:                    c,
382                 expectPath:           hash,
383                 expectAPIToken:       "abc123",
384                 expectBody:           "foo",
385                 expectStorageClass:   "",
386                 returnStorageClasses: "",
387                 handled:              make(chan string, 5),
388         }
389
390         arv, _ := arvadosclient.MakeArvadosClient()
391         kc, _ := MakeKeepClient(arv)
392
393         kc.Want_replicas = 2
394         arv.ApiToken = "abc123"
395         localRoots := make(map[string]string)
396         writableLocalRoots := make(map[string]string)
397
398         ks := RunSomeFakeKeepServers(st, 5)
399
400         for i, k := range ks {
401                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
402                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
403                 defer k.listener.Close()
404         }
405
406         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
407
408         kc.PutB([]byte("foo"))
409
410         shuff := NewRootSorter(
411                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
412
413         s1 := <-st.handled
414         s2 := <-st.handled
415         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
416                 (s1 == shuff[1] && s2 == shuff[0]),
417                 Equals,
418                 true)
419 }
420
421 func (s *StandaloneSuite) TestPutHR(c *C) {
422         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
423
424         st := &StubPutHandler{
425                 c:                    c,
426                 expectPath:           hash,
427                 expectAPIToken:       "abc123",
428                 expectBody:           "foo",
429                 expectStorageClass:   "",
430                 returnStorageClasses: "",
431                 handled:              make(chan string, 5),
432         }
433
434         arv, _ := arvadosclient.MakeArvadosClient()
435         kc, _ := MakeKeepClient(arv)
436
437         kc.Want_replicas = 2
438         arv.ApiToken = "abc123"
439         localRoots := make(map[string]string)
440         writableLocalRoots := make(map[string]string)
441
442         ks := RunSomeFakeKeepServers(st, 5)
443
444         for i, k := range ks {
445                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
446                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
447                 defer k.listener.Close()
448         }
449
450         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
451
452         reader, writer := io.Pipe()
453
454         go func() {
455                 writer.Write([]byte("foo"))
456                 writer.Close()
457         }()
458
459         kc.PutHR(hash, reader, 3)
460
461         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
462
463         s1 := <-st.handled
464         s2 := <-st.handled
465
466         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
467                 (s1 == shuff[1] && s2 == shuff[0]),
468                 Equals,
469                 true)
470 }
471
472 func (s *StandaloneSuite) TestPutWithFail(c *C) {
473         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
474
475         st := &StubPutHandler{
476                 c:                    c,
477                 expectPath:           hash,
478                 expectAPIToken:       "abc123",
479                 expectBody:           "foo",
480                 expectStorageClass:   "",
481                 returnStorageClasses: "",
482                 handled:              make(chan string, 4),
483         }
484
485         fh := FailHandler{
486                 make(chan string, 1)}
487
488         arv, err := arvadosclient.MakeArvadosClient()
489         c.Check(err, IsNil)
490         kc, _ := MakeKeepClient(arv)
491
492         kc.Want_replicas = 2
493         arv.ApiToken = "abc123"
494         localRoots := make(map[string]string)
495         writableLocalRoots := make(map[string]string)
496
497         ks1 := RunSomeFakeKeepServers(st, 4)
498         ks2 := RunSomeFakeKeepServers(fh, 1)
499
500         for i, k := range ks1 {
501                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
502                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
503                 defer k.listener.Close()
504         }
505         for i, k := range ks2 {
506                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
507                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
508                 defer k.listener.Close()
509         }
510
511         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
512
513         shuff := NewRootSorter(
514                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
515         c.Logf("%+v", shuff)
516
517         phash, replicas, err := kc.PutB([]byte("foo"))
518
519         <-fh.handled
520
521         c.Check(err, Equals, nil)
522         c.Check(phash, Equals, "")
523         c.Check(replicas, Equals, 2)
524
525         s1 := <-st.handled
526         s2 := <-st.handled
527
528         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
529                 (s1 == shuff[2] && s2 == shuff[1]),
530                 Equals,
531                 true)
532 }
533
534 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
535         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
536
537         st := &StubPutHandler{
538                 c:                    c,
539                 expectPath:           hash,
540                 expectAPIToken:       "abc123",
541                 expectBody:           "foo",
542                 expectStorageClass:   "",
543                 returnStorageClasses: "",
544                 handled:              make(chan string, 1),
545         }
546
547         fh := FailHandler{
548                 make(chan string, 4)}
549
550         arv, err := arvadosclient.MakeArvadosClient()
551         c.Check(err, IsNil)
552         kc, _ := MakeKeepClient(arv)
553
554         kc.Want_replicas = 2
555         kc.Retries = 0
556         arv.ApiToken = "abc123"
557         localRoots := make(map[string]string)
558         writableLocalRoots := make(map[string]string)
559
560         ks1 := RunSomeFakeKeepServers(st, 1)
561         ks2 := RunSomeFakeKeepServers(fh, 4)
562
563         for i, k := range ks1 {
564                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
565                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
566                 defer k.listener.Close()
567         }
568         for i, k := range ks2 {
569                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
570                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
571                 defer k.listener.Close()
572         }
573
574         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
575
576         _, replicas, err := kc.PutB([]byte("foo"))
577
578         c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
579         c.Check(replicas, Equals, 1)
580         c.Check(<-st.handled, Equals, ks1[0].url)
581 }
582
583 type StubGetHandler struct {
584         c              *C
585         expectPath     string
586         expectAPIToken string
587         httpStatus     int
588         body           []byte
589 }
590
591 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
592         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
593         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken))
594         resp.WriteHeader(sgh.httpStatus)
595         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
596         resp.Write(sgh.body)
597 }
598
599 func (s *StandaloneSuite) TestGet(c *C) {
600         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
601
602         st := StubGetHandler{
603                 c,
604                 hash,
605                 "abc123",
606                 http.StatusOK,
607                 []byte("foo")}
608
609         ks := RunFakeKeepServer(st)
610         defer ks.listener.Close()
611
612         arv, err := arvadosclient.MakeArvadosClient()
613         c.Check(err, IsNil)
614         kc, _ := MakeKeepClient(arv)
615         arv.ApiToken = "abc123"
616         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
617
618         r, n, url2, err := kc.Get(hash)
619         defer r.Close()
620         c.Check(err, Equals, nil)
621         c.Check(n, Equals, int64(3))
622         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
623
624         content, err2 := ioutil.ReadAll(r)
625         c.Check(err2, Equals, nil)
626         c.Check(content, DeepEquals, []byte("foo"))
627 }
628
629 func (s *StandaloneSuite) TestGet404(c *C) {
630         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
631
632         st := Error404Handler{make(chan string, 1)}
633
634         ks := RunFakeKeepServer(st)
635         defer ks.listener.Close()
636
637         arv, err := arvadosclient.MakeArvadosClient()
638         c.Check(err, IsNil)
639         kc, _ := MakeKeepClient(arv)
640         arv.ApiToken = "abc123"
641         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
642
643         r, n, url2, err := kc.Get(hash)
644         c.Check(err, Equals, BlockNotFound)
645         c.Check(n, Equals, int64(0))
646         c.Check(url2, Equals, "")
647         c.Check(r, Equals, nil)
648 }
649
650 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
651         st := Error404Handler{make(chan string, 1)}
652
653         ks := RunFakeKeepServer(st)
654         defer ks.listener.Close()
655
656         arv, err := arvadosclient.MakeArvadosClient()
657         c.Check(err, IsNil)
658         kc, _ := MakeKeepClient(arv)
659         arv.ApiToken = "abc123"
660         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
661
662         r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
663         c.Check(err, IsNil)
664         c.Check(n, Equals, int64(0))
665         c.Check(url2, Equals, "")
666         c.Assert(r, NotNil)
667         buf, err := ioutil.ReadAll(r)
668         c.Check(err, IsNil)
669         c.Check(buf, DeepEquals, []byte{})
670 }
671
672 func (s *StandaloneSuite) TestGetFail(c *C) {
673         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
674
675         st := FailHandler{make(chan string, 1)}
676
677         ks := RunFakeKeepServer(st)
678         defer ks.listener.Close()
679
680         arv, err := arvadosclient.MakeArvadosClient()
681         c.Check(err, IsNil)
682         kc, _ := MakeKeepClient(arv)
683         arv.ApiToken = "abc123"
684         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
685         kc.Retries = 0
686
687         r, n, url2, err := kc.Get(hash)
688         errNotFound, _ := err.(*ErrNotFound)
689         c.Check(errNotFound, NotNil)
690         c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
691         c.Check(errNotFound.Temporary(), Equals, true)
692         c.Check(n, Equals, int64(0))
693         c.Check(url2, Equals, "")
694         c.Check(r, Equals, nil)
695 }
696
697 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
698         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
699
700         st := &FailThenSucceedHandler{
701                 handled: make(chan string, 1),
702                 successhandler: StubGetHandler{
703                         c,
704                         hash,
705                         "abc123",
706                         http.StatusOK,
707                         []byte("foo")}}
708
709         ks := RunFakeKeepServer(st)
710         defer ks.listener.Close()
711
712         arv, err := arvadosclient.MakeArvadosClient()
713         c.Check(err, IsNil)
714         kc, _ := MakeKeepClient(arv)
715         arv.ApiToken = "abc123"
716         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
717
718         r, n, url2, err := kc.Get(hash)
719         defer r.Close()
720         c.Check(err, Equals, nil)
721         c.Check(n, Equals, int64(3))
722         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
723
724         content, err2 := ioutil.ReadAll(r)
725         c.Check(err2, Equals, nil)
726         c.Check(content, DeepEquals, []byte("foo"))
727
728         c.Logf("%q", st.reqIDs)
729         c.Assert(len(st.reqIDs) > 1, Equals, true)
730         for _, reqid := range st.reqIDs {
731                 c.Check(reqid, Not(Equals), "")
732                 c.Check(reqid, Equals, st.reqIDs[0])
733         }
734 }
735
736 func (s *StandaloneSuite) TestGetNetError(c *C) {
737         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
738
739         arv, err := arvadosclient.MakeArvadosClient()
740         c.Check(err, IsNil)
741         kc, _ := MakeKeepClient(arv)
742         arv.ApiToken = "abc123"
743         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
744
745         r, n, url2, err := kc.Get(hash)
746         errNotFound, _ := err.(*ErrNotFound)
747         c.Check(errNotFound, NotNil)
748         c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
749         c.Check(errNotFound.Temporary(), Equals, true)
750         c.Check(n, Equals, int64(0))
751         c.Check(url2, Equals, "")
752         c.Check(r, Equals, nil)
753 }
754
755 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
756         uuid := "zzzzz-bi6l4-123451234512345"
757         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
758
759         // This one shouldn't be used:
760         ks0 := RunFakeKeepServer(StubGetHandler{
761                 c,
762                 "error if used",
763                 "abc123",
764                 http.StatusOK,
765                 []byte("foo")})
766         defer ks0.listener.Close()
767         // This one should be used:
768         ks := RunFakeKeepServer(StubGetHandler{
769                 c,
770                 hash + "+K@" + uuid,
771                 "abc123",
772                 http.StatusOK,
773                 []byte("foo")})
774         defer ks.listener.Close()
775
776         arv, err := arvadosclient.MakeArvadosClient()
777         c.Check(err, IsNil)
778         kc, _ := MakeKeepClient(arv)
779         arv.ApiToken = "abc123"
780         kc.SetServiceRoots(
781                 map[string]string{"x": ks0.url},
782                 nil,
783                 map[string]string{uuid: ks.url})
784
785         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
786         defer r.Close()
787         c.Check(err, Equals, nil)
788         c.Check(n, Equals, int64(3))
789         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
790
791         content, err := ioutil.ReadAll(r)
792         c.Check(err, Equals, nil)
793         c.Check(content, DeepEquals, []byte("foo"))
794 }
795
796 // Use a service hint to fetch from a local disk service, overriding
797 // rendezvous probe order.
798 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
799         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
800         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
801
802         // This one shouldn't be used, although it appears first in
803         // rendezvous probe order:
804         ks0 := RunFakeKeepServer(StubGetHandler{
805                 c,
806                 "error if used",
807                 "abc123",
808                 http.StatusOK,
809                 []byte("foo")})
810         defer ks0.listener.Close()
811         // This one should be used:
812         ks := RunFakeKeepServer(StubGetHandler{
813                 c,
814                 hash + "+K@" + uuid,
815                 "abc123",
816                 http.StatusOK,
817                 []byte("foo")})
818         defer ks.listener.Close()
819
820         arv, err := arvadosclient.MakeArvadosClient()
821         c.Check(err, IsNil)
822         kc, _ := MakeKeepClient(arv)
823         arv.ApiToken = "abc123"
824         kc.SetServiceRoots(
825                 map[string]string{
826                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
827                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
828                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
829                         uuid:                          ks.url},
830                 nil,
831                 map[string]string{
832                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
833                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
834                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
835                         uuid:                          ks.url},
836         )
837
838         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
839         defer r.Close()
840         c.Check(err, Equals, nil)
841         c.Check(n, Equals, int64(3))
842         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
843
844         content, err := ioutil.ReadAll(r)
845         c.Check(err, Equals, nil)
846         c.Check(content, DeepEquals, []byte("foo"))
847 }
848
849 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
850         uuid := "zzzzz-bi6l4-123451234512345"
851         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
852
853         ksLocal := RunFakeKeepServer(StubGetHandler{
854                 c,
855                 hash + "+K@" + uuid,
856                 "abc123",
857                 http.StatusOK,
858                 []byte("foo")})
859         defer ksLocal.listener.Close()
860         ksGateway := RunFakeKeepServer(StubGetHandler{
861                 c,
862                 hash + "+K@" + uuid,
863                 "abc123",
864                 http.StatusInternalServerError,
865                 []byte("Error")})
866         defer ksGateway.listener.Close()
867
868         arv, err := arvadosclient.MakeArvadosClient()
869         c.Check(err, IsNil)
870         kc, _ := MakeKeepClient(arv)
871         arv.ApiToken = "abc123"
872         kc.SetServiceRoots(
873                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
874                 nil,
875                 map[string]string{uuid: ksGateway.url})
876
877         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
878         c.Assert(err, Equals, nil)
879         defer r.Close()
880         c.Check(n, Equals, int64(3))
881         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
882
883         content, err := ioutil.ReadAll(r)
884         c.Check(err, Equals, nil)
885         c.Check(content, DeepEquals, []byte("foo"))
886 }
887
888 type BarHandler struct {
889         handled chan string
890 }
891
892 func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
893         resp.Write([]byte("bar"))
894         h.handled <- fmt.Sprintf("http://%s", req.Host)
895 }
896
897 func (s *StandaloneSuite) TestChecksum(c *C) {
898         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
899         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
900
901         st := BarHandler{make(chan string, 1)}
902
903         ks := RunFakeKeepServer(st)
904         defer ks.listener.Close()
905
906         arv, err := arvadosclient.MakeArvadosClient()
907         c.Check(err, IsNil)
908         kc, _ := MakeKeepClient(arv)
909         arv.ApiToken = "abc123"
910         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
911
912         r, n, _, err := kc.Get(barhash)
913         c.Check(err, IsNil)
914         _, err = ioutil.ReadAll(r)
915         c.Check(n, Equals, int64(3))
916         c.Check(err, Equals, nil)
917
918         <-st.handled
919
920         r, n, _, err = kc.Get(foohash)
921         c.Check(err, IsNil)
922         _, err = ioutil.ReadAll(r)
923         c.Check(n, Equals, int64(3))
924         c.Check(err, Equals, BadChecksum)
925
926         <-st.handled
927 }
928
929 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
930         content := []byte("waz")
931         hash := fmt.Sprintf("%x", md5.Sum(content))
932
933         fh := Error404Handler{
934                 make(chan string, 4)}
935
936         st := StubGetHandler{
937                 c,
938                 hash,
939                 "abc123",
940                 http.StatusOK,
941                 content}
942
943         arv, err := arvadosclient.MakeArvadosClient()
944         c.Check(err, IsNil)
945         kc, _ := MakeKeepClient(arv)
946         arv.ApiToken = "abc123"
947         localRoots := make(map[string]string)
948         writableLocalRoots := make(map[string]string)
949
950         ks1 := RunSomeFakeKeepServers(st, 1)
951         ks2 := RunSomeFakeKeepServers(fh, 4)
952
953         for i, k := range ks1 {
954                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
955                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
956                 defer k.listener.Close()
957         }
958         for i, k := range ks2 {
959                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
960                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
961                 defer k.listener.Close()
962         }
963
964         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
965         kc.Retries = 0
966
967         // This test works only if one of the failing services is
968         // attempted before the succeeding service. Otherwise,
969         // <-fh.handled below will just hang! (Probe order depends on
970         // the choice of block content "waz" and the UUIDs of the fake
971         // servers, so we just tried different strings until we found
972         // an example that passes this Assert.)
973         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
974
975         r, n, url2, err := kc.Get(hash)
976
977         <-fh.handled
978         c.Check(err, Equals, nil)
979         c.Check(n, Equals, int64(3))
980         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
981
982         readContent, err2 := ioutil.ReadAll(r)
983         c.Check(err2, Equals, nil)
984         c.Check(readContent, DeepEquals, content)
985 }
986
987 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
988         content := []byte("TestPutGetHead")
989
990         arv, err := arvadosclient.MakeArvadosClient()
991         c.Check(err, IsNil)
992         kc, err := MakeKeepClient(arv)
993         c.Assert(err, Equals, nil)
994
995         hash := fmt.Sprintf("%x", md5.Sum(content))
996
997         {
998                 n, _, err := kc.Ask(hash)
999                 c.Check(err, Equals, BlockNotFound)
1000                 c.Check(n, Equals, int64(0))
1001         }
1002         {
1003                 hash2, replicas, err := kc.PutB(content)
1004                 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
1005                 c.Check(replicas, Equals, 2)
1006                 c.Check(err, Equals, nil)
1007         }
1008         {
1009                 r, n, url2, err := kc.Get(hash)
1010                 c.Check(err, Equals, nil)
1011                 c.Check(n, Equals, int64(len(content)))
1012                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
1013
1014                 readContent, err2 := ioutil.ReadAll(r)
1015                 c.Check(err2, Equals, nil)
1016                 c.Check(readContent, DeepEquals, content)
1017         }
1018         {
1019                 n, url2, err := kc.Ask(hash)
1020                 c.Check(err, Equals, nil)
1021                 c.Check(n, Equals, int64(len(content)))
1022                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
1023         }
1024         {
1025                 loc, err := kc.LocalLocator(hash)
1026                 c.Check(err, Equals, nil)
1027                 c.Assert(len(loc) >= 32, Equals, true)
1028                 c.Check(loc[:32], Equals, hash[:32])
1029         }
1030         {
1031                 content := []byte("the perth county conspiracy")
1032                 loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
1033                 c.Check(loc, Equals, "")
1034                 c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
1035                 c.Check(err, ErrorMatches, `.*HTTP 400.*`)
1036         }
1037 }
1038
1039 type StubProxyHandler struct {
1040         handled chan string
1041 }
1042
1043 func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1044         resp.Header().Set("X-Keep-Replicas-Stored", "2")
1045         h.handled <- fmt.Sprintf("http://%s", req.Host)
1046 }
1047
1048 func (s *StandaloneSuite) TestPutProxy(c *C) {
1049         st := StubProxyHandler{make(chan string, 1)}
1050
1051         arv, err := arvadosclient.MakeArvadosClient()
1052         c.Check(err, IsNil)
1053         kc, _ := MakeKeepClient(arv)
1054
1055         kc.Want_replicas = 2
1056         arv.ApiToken = "abc123"
1057         localRoots := make(map[string]string)
1058         writableLocalRoots := make(map[string]string)
1059
1060         ks1 := RunSomeFakeKeepServers(st, 1)
1061
1062         for i, k := range ks1 {
1063                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1064                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1065                 defer k.listener.Close()
1066         }
1067
1068         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1069
1070         _, replicas, err := kc.PutB([]byte("foo"))
1071         <-st.handled
1072
1073         c.Check(err, Equals, nil)
1074         c.Check(replicas, Equals, 2)
1075 }
1076
1077 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
1078         st := StubProxyHandler{make(chan string, 1)}
1079
1080         arv, err := arvadosclient.MakeArvadosClient()
1081         c.Check(err, IsNil)
1082         kc, _ := MakeKeepClient(arv)
1083
1084         kc.Want_replicas = 3
1085         arv.ApiToken = "abc123"
1086         localRoots := make(map[string]string)
1087         writableLocalRoots := make(map[string]string)
1088
1089         ks1 := RunSomeFakeKeepServers(st, 1)
1090
1091         for i, k := range ks1 {
1092                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1093                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1094                 defer k.listener.Close()
1095         }
1096         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1097
1098         _, replicas, err := kc.PutB([]byte("foo"))
1099         <-st.handled
1100
1101         c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1102         c.Check(replicas, Equals, 2)
1103 }
1104
1105 func (s *StandaloneSuite) TestMakeLocator(c *C) {
1106         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
1107         c.Check(err, Equals, nil)
1108         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1109         c.Check(l.Size, Equals, 3)
1110         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
1111 }
1112
1113 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
1114         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
1115         c.Check(err, Equals, nil)
1116         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1117         c.Check(l.Size, Equals, -1)
1118         c.Check(l.Hints, DeepEquals, []string{})
1119 }
1120
1121 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
1122         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
1123         c.Check(err, Equals, nil)
1124         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1125         c.Check(l.Size, Equals, -1)
1126         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
1127 }
1128
1129 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
1130         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
1131         l, err := MakeLocator(str)
1132         c.Check(err, Equals, nil)
1133         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1134         c.Check(l.Size, Equals, 3)
1135         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
1136         c.Check(l.String(), Equals, str)
1137 }
1138
1139 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
1140         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
1141         c.Check(err, Equals, InvalidLocatorError)
1142 }
1143
1144 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
1145         hash := Md5String("foo")
1146
1147         st := &StubPutHandler{
1148                 c:                    c,
1149                 expectPath:           hash,
1150                 expectAPIToken:       "abc123",
1151                 expectBody:           "foo",
1152                 expectStorageClass:   "",
1153                 returnStorageClasses: "",
1154                 handled:              make(chan string, 5),
1155         }
1156
1157         arv, _ := arvadosclient.MakeArvadosClient()
1158         kc, _ := MakeKeepClient(arv)
1159
1160         kc.Want_replicas = 2
1161         arv.ApiToken = "abc123"
1162         localRoots := make(map[string]string)
1163         writableLocalRoots := make(map[string]string)
1164
1165         ks := RunSomeFakeKeepServers(st, 5)
1166
1167         for i, k := range ks {
1168                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1169                 if i == 0 {
1170                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1171                 }
1172                 defer k.listener.Close()
1173         }
1174
1175         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1176
1177         _, replicas, err := kc.PutB([]byte("foo"))
1178
1179         c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1180         c.Check(replicas, Equals, 1)
1181
1182         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1183 }
1184
1185 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1186         hash := Md5String("foo")
1187
1188         st := &StubPutHandler{
1189                 c:                    c,
1190                 expectPath:           hash,
1191                 expectAPIToken:       "abc123",
1192                 expectBody:           "foo",
1193                 expectStorageClass:   "",
1194                 returnStorageClasses: "",
1195                 handled:              make(chan string, 5),
1196         }
1197
1198         arv, _ := arvadosclient.MakeArvadosClient()
1199         kc, _ := MakeKeepClient(arv)
1200
1201         kc.Want_replicas = 2
1202         arv.ApiToken = "abc123"
1203         localRoots := make(map[string]string)
1204         writableLocalRoots := make(map[string]string)
1205
1206         ks := RunSomeFakeKeepServers(st, 5)
1207
1208         for i, k := range ks {
1209                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1210                 defer k.listener.Close()
1211         }
1212
1213         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1214
1215         _, replicas, err := kc.PutB([]byte("foo"))
1216
1217         c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1218         c.Check(replicas, Equals, 0)
1219 }
1220
1221 type StubGetIndexHandler struct {
1222         c              *C
1223         expectPath     string
1224         expectAPIToken string
1225         httpStatus     int
1226         body           []byte
1227 }
1228
1229 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1230         h.c.Check(req.URL.Path, Equals, h.expectPath)
1231         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1232         resp.WriteHeader(h.httpStatus)
1233         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1234         resp.Write(h.body)
1235 }
1236
1237 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1238         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1239
1240         st := StubGetIndexHandler{
1241                 c,
1242                 "/index",
1243                 "abc123",
1244                 http.StatusOK,
1245                 []byte(hash + "+3 1443559274\n\n")}
1246
1247         ks := RunFakeKeepServer(st)
1248         defer ks.listener.Close()
1249
1250         arv, err := arvadosclient.MakeArvadosClient()
1251         c.Assert(err, IsNil)
1252         kc, err := MakeKeepClient(arv)
1253         c.Assert(err, IsNil)
1254         arv.ApiToken = "abc123"
1255         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1256
1257         r, err := kc.GetIndex("x", "")
1258         c.Check(err, IsNil)
1259
1260         content, err2 := ioutil.ReadAll(r)
1261         c.Check(err2, Equals, nil)
1262         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1263 }
1264
1265 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1266         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1267
1268         st := StubGetIndexHandler{
1269                 c,
1270                 "/index/" + hash[0:3],
1271                 "abc123",
1272                 http.StatusOK,
1273                 []byte(hash + "+3 1443559274\n\n")}
1274
1275         ks := RunFakeKeepServer(st)
1276         defer ks.listener.Close()
1277
1278         arv, err := arvadosclient.MakeArvadosClient()
1279         c.Check(err, IsNil)
1280         kc, _ := MakeKeepClient(arv)
1281         arv.ApiToken = "abc123"
1282         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1283
1284         r, err := kc.GetIndex("x", hash[0:3])
1285         c.Assert(err, Equals, nil)
1286
1287         content, err2 := ioutil.ReadAll(r)
1288         c.Check(err2, Equals, nil)
1289         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1290 }
1291
1292 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1293         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1294
1295         st := StubGetIndexHandler{
1296                 c,
1297                 "/index/" + hash[0:3],
1298                 "abc123",
1299                 http.StatusOK,
1300                 []byte(hash)}
1301
1302         ks := RunFakeKeepServer(st)
1303         defer ks.listener.Close()
1304
1305         arv, err := arvadosclient.MakeArvadosClient()
1306         c.Check(err, IsNil)
1307         kc, _ := MakeKeepClient(arv)
1308         arv.ApiToken = "abc123"
1309         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1310
1311         _, err = kc.GetIndex("x", hash[0:3])
1312         c.Check(err, Equals, ErrIncompleteIndex)
1313 }
1314
1315 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1316         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1317
1318         st := StubGetIndexHandler{
1319                 c,
1320                 "/index/" + hash[0:3],
1321                 "abc123",
1322                 http.StatusOK,
1323                 []byte(hash)}
1324
1325         ks := RunFakeKeepServer(st)
1326         defer ks.listener.Close()
1327
1328         arv, err := arvadosclient.MakeArvadosClient()
1329         c.Check(err, IsNil)
1330         kc, _ := MakeKeepClient(arv)
1331         arv.ApiToken = "abc123"
1332         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1333
1334         _, err = kc.GetIndex("y", hash[0:3])
1335         c.Check(err, Equals, ErrNoSuchKeepServer)
1336 }
1337
1338 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1339         st := StubGetIndexHandler{
1340                 c,
1341                 "/index/abcd",
1342                 "abc123",
1343                 http.StatusOK,
1344                 []byte("\n")}
1345
1346         ks := RunFakeKeepServer(st)
1347         defer ks.listener.Close()
1348
1349         arv, err := arvadosclient.MakeArvadosClient()
1350         c.Check(err, IsNil)
1351         kc, _ := MakeKeepClient(arv)
1352         arv.ApiToken = "abc123"
1353         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1354
1355         r, err := kc.GetIndex("x", "abcd")
1356         c.Check(err, Equals, nil)
1357
1358         content, err2 := ioutil.ReadAll(r)
1359         c.Check(err2, Equals, nil)
1360         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1361 }
1362
1363 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1364         st := &FailThenSucceedHandler{
1365                 handled: make(chan string, 1),
1366                 successhandler: &StubPutHandler{
1367                         c:                    c,
1368                         expectPath:           Md5String("foo"),
1369                         expectAPIToken:       "abc123",
1370                         expectBody:           "foo",
1371                         expectStorageClass:   "",
1372                         returnStorageClasses: "",
1373                         handled:              make(chan string, 5),
1374                 },
1375         }
1376
1377         arv, _ := arvadosclient.MakeArvadosClient()
1378         kc, _ := MakeKeepClient(arv)
1379
1380         kc.Want_replicas = 2
1381         arv.ApiToken = "abc123"
1382         localRoots := make(map[string]string)
1383         writableLocalRoots := make(map[string]string)
1384
1385         ks := RunSomeFakeKeepServers(st, 2)
1386
1387         for i, k := range ks {
1388                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1389                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1390                 defer k.listener.Close()
1391         }
1392
1393         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1394
1395         hash, replicas, err := kc.PutB([]byte("foo"))
1396
1397         c.Check(err, Equals, nil)
1398         c.Check(hash, Equals, "")
1399         c.Check(replicas, Equals, 2)
1400 }
1401
1402 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1403         arv, err := arvadosclient.MakeArvadosClient()
1404         c.Assert(err, Equals, nil)
1405
1406         // Add an additional "testblobstore" keepservice
1407         blobKeepService := make(arvadosclient.Dict)
1408         err = arv.Create("keep_services",
1409                 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1410                         "service_host": "localhost",
1411                         "service_port": "21321",
1412                         "service_type": "testblobstore"}},
1413                 &blobKeepService)
1414         c.Assert(err, Equals, nil)
1415         defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1416         RefreshServiceDiscovery()
1417
1418         // Make a keepclient and ensure that the testblobstore is included
1419         kc, err := MakeKeepClient(arv)
1420         c.Assert(err, Equals, nil)
1421
1422         // verify kc.LocalRoots
1423         c.Check(len(kc.LocalRoots()), Equals, 3)
1424         for _, root := range kc.LocalRoots() {
1425                 c.Check(root, Matches, "http://localhost:\\d+")
1426         }
1427         c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1428
1429         // verify kc.GatewayRoots
1430         c.Check(len(kc.GatewayRoots()), Equals, 3)
1431         for _, root := range kc.GatewayRoots() {
1432                 c.Check(root, Matches, "http://localhost:\\d+")
1433         }
1434         c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1435
1436         // verify kc.WritableLocalRoots
1437         c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1438         for _, root := range kc.WritableLocalRoots() {
1439                 c.Check(root, Matches, "http://localhost:\\d+")
1440         }
1441         c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1442
1443         c.Assert(kc.replicasPerService, Equals, 0)
1444         c.Assert(kc.foundNonDiskSvc, Equals, true)
1445         c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
1446 }