17995: Fix merge error.
[arvados.git] / sdk / go / keepclient / keepclient_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "log"
15         "net"
16         "net/http"
17         "os"
18         "strings"
19         "sync"
20         "testing"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25         "git.arvados.org/arvados.git/sdk/go/arvadostest"
26         . "gopkg.in/check.v1"
27         check "gopkg.in/check.v1"
28 )
29
30 // Gocheck boilerplate
31 func Test(t *testing.T) {
32         TestingT(t)
33 }
34
35 // Gocheck boilerplate
36 var _ = Suite(&ServerRequiredSuite{})
37 var _ = Suite(&StandaloneSuite{})
38
39 // Tests that require the Keep server running
40 type ServerRequiredSuite struct{}
41
42 // Standalone tests
43 type StandaloneSuite struct{}
44
45 func (s *StandaloneSuite) SetUpTest(c *C) {
46         RefreshServiceDiscovery()
47 }
48
49 func pythonDir() string {
50         cwd, _ := os.Getwd()
51         return fmt.Sprintf("%s/../../python/tests", cwd)
52 }
53
54 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
55         arvadostest.StartKeep(2, false)
56 }
57
58 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
59         arvadostest.StopKeep(2)
60 }
61
62 func (s *ServerRequiredSuite) SetUpTest(c *C) {
63         RefreshServiceDiscovery()
64 }
65
66 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
67         arv, err := arvadosclient.MakeArvadosClient()
68         c.Assert(err, Equals, nil)
69
70         kc, err := MakeKeepClient(arv)
71
72         c.Assert(err, Equals, nil)
73         c.Check(len(kc.LocalRoots()), Equals, 2)
74         for _, root := range kc.LocalRoots() {
75                 c.Check(root, Matches, "http://localhost:\\d+")
76         }
77 }
78
79 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
80         arv, err := arvadosclient.MakeArvadosClient()
81         c.Assert(err, Equals, nil)
82
83         kc, err := MakeKeepClient(arv)
84         c.Check(err, IsNil)
85         c.Assert(kc.Want_replicas, Equals, 2)
86
87         arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
88         kc, err = MakeKeepClient(arv)
89         c.Check(err, IsNil)
90         c.Assert(kc.Want_replicas, Equals, 3)
91
92         arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
93         kc, err = MakeKeepClient(arv)
94         c.Check(err, IsNil)
95         c.Assert(kc.Want_replicas, Equals, 1)
96 }
97
98 type StubPutHandler struct {
99         c                    *C
100         expectPath           string
101         expectAPIToken       string
102         expectBody           string
103         expectStorageClass   string
104         returnStorageClasses string
105         handled              chan string
106         requests             []*http.Request
107         mtx                  sync.Mutex
108 }
109
110 func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
111         sph.mtx.Lock()
112         sph.requests = append(sph.requests, req)
113         sph.mtx.Unlock()
114         sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
115         sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken))
116         if sph.expectStorageClass != "*" {
117                 sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
118         }
119         body, err := ioutil.ReadAll(req.Body)
120         sph.c.Check(err, Equals, nil)
121         sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
122         resp.Header().Set("X-Keep-Replicas-Stored", "1")
123         if sph.returnStorageClasses != "" {
124                 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
125         }
126         resp.WriteHeader(200)
127         sph.handled <- fmt.Sprintf("http://%s", req.Host)
128 }
129
130 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
131         var err error
132         // If we don't explicitly bind it to localhost, ks.listener.Addr() will
133         // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
134         ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
135         if err != nil {
136                 panic(fmt.Sprintf("Could not listen on any port"))
137         }
138         ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
139         go http.Serve(ks.listener, st)
140         return
141 }
142
143 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
144         io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
145
146         ks := RunFakeKeepServer(st)
147         defer ks.listener.Close()
148
149         arv, _ := arvadosclient.MakeArvadosClient()
150         arv.ApiToken = "abc123"
151
152         kc, _ := MakeKeepClient(arv)
153
154         reader, writer := io.Pipe()
155         uploadStatusChan := make(chan uploadStatus)
156
157         f(kc, ks.url, reader, writer, uploadStatusChan)
158 }
159
160 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
161         log.Printf("TestUploadToStubKeepServer")
162
163         st := &StubPutHandler{
164                 c:                    c,
165                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
166                 expectAPIToken:       "abc123",
167                 expectBody:           "foo",
168                 expectStorageClass:   "",
169                 returnStorageClasses: "default=1",
170                 handled:              make(chan string),
171         }
172
173         UploadToStubHelper(c, st,
174                 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
175                         go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
176
177                         writer.Write([]byte("foo"))
178                         writer.Close()
179
180                         <-st.handled
181                         status := <-uploadStatusChan
182                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
183                 })
184 }
185
186 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
187         st := &StubPutHandler{
188                 c:                    c,
189                 expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
190                 expectAPIToken:       "abc123",
191                 expectBody:           "foo",
192                 expectStorageClass:   "",
193                 returnStorageClasses: "default=1",
194                 handled:              make(chan string),
195         }
196
197         UploadToStubHelper(c, st,
198                 func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
199                         go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
200
201                         <-st.handled
202
203                         status := <-uploadStatusChan
204                         c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
205                 })
206 }
207
208 func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
209         for _, trial := range []struct {
210                 respHeader string
211                 expectMap  map[string]int
212         }{
213                 {"", nil},
214                 {"foo=1", map[string]int{"foo": 1}},
215                 {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
216                 {" =foo=1 ", nil},
217                 {"foo", nil},
218         } {
219                 st := &StubPutHandler{
220                         c:                    c,
221                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
222                         expectAPIToken:       "abc123",
223                         expectBody:           "foo",
224                         expectStorageClass:   "",
225                         returnStorageClasses: trial.respHeader,
226                         handled:              make(chan string),
227                 }
228
229                 UploadToStubHelper(c, st,
230                         func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
231                                 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
232
233                                 writer.Write([]byte("foo"))
234                                 writer.Close()
235
236                                 <-st.handled
237                                 status := <-uploadStatusChan
238                                 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
239                         })
240         }
241 }
242
243 func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
244         nServers := 5
245         for _, trial := range []struct {
246                 replicas      int
247                 clientClasses []string
248                 putClasses    []string // putClasses takes precedence over clientClasses
249                 minRequests   int
250                 maxRequests   int
251                 success       bool
252         }{
253                 {1, []string{"class1"}, nil, 1, 1, true},
254                 {2, []string{"class1"}, nil, 1, 2, true},
255                 {3, []string{"class1"}, nil, 2, 3, true},
256                 {1, []string{"class1", "class2"}, nil, 1, 1, true},
257                 {3, nil, []string{"class1"}, 2, 3, true},
258                 {1, nil, []string{"class1", "class2"}, 1, 1, true},
259                 {1, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true},
260                 {1, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false},
261                 {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false},
262                 {1, []string{"class404"}, nil, nServers, nServers, false},
263                 {1, []string{"class1", "class404"}, nil, nServers, nServers, false},
264                 {1, nil, []string{"class1", "class404"}, nServers, nServers, false},
265         } {
266                 c.Logf("%+v", trial)
267                 st := &StubPutHandler{
268                         c:                    c,
269                         expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
270                         expectAPIToken:       "abc123",
271                         expectBody:           "foo",
272                         expectStorageClass:   "*",
273                         returnStorageClasses: "class1=2, class2=2",
274                         handled:              make(chan string, 100),
275                 }
276                 ks := RunSomeFakeKeepServers(st, nServers)
277                 arv, _ := arvadosclient.MakeArvadosClient()
278                 kc, _ := MakeKeepClient(arv)
279                 kc.Want_replicas = trial.replicas
280                 kc.StorageClasses = trial.clientClasses
281                 arv.ApiToken = "abc123"
282                 localRoots := make(map[string]string)
283                 writableLocalRoots := make(map[string]string)
284                 for i, k := range ks {
285                         localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
286                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
287                         defer k.listener.Close()
288                 }
289                 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
290
291                 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
292                         Data:           []byte("foo"),
293                         StorageClasses: trial.putClasses,
294                 })
295                 if trial.success {
296                         c.Check(err, check.IsNil)
297                 } else {
298                         c.Check(err, check.NotNil)
299                 }
300                 c.Check(len(st.handled) >= trial.minRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
301                 c.Check(len(st.handled) <= trial.maxRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
302                 if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, check.Equals, true) {
303                         // Max concurrency should be 1. First request
304                         // should have succeeded for class1. Second
305                         // request should only ask for class404.
306                         c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), check.Equals, "class404")
307                 }
308         }
309 }
310
311 type FailHandler struct {
312         handled chan string
313 }
314
315 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
316         resp.WriteHeader(500)
317         fh.handled <- fmt.Sprintf("http://%s", req.Host)
318 }
319
320 type FailThenSucceedHandler struct {
321         handled        chan string
322         count          int
323         successhandler http.Handler
324         reqIDs         []string
325 }
326
327 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
328         fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
329         if fh.count == 0 {
330                 resp.WriteHeader(500)
331                 fh.count++
332                 fh.handled <- fmt.Sprintf("http://%s", req.Host)
333         } else {
334                 fh.successhandler.ServeHTTP(resp, req)
335         }
336 }
337
338 type Error404Handler struct {
339         handled chan string
340 }
341
342 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
343         resp.WriteHeader(404)
344         fh.handled <- fmt.Sprintf("http://%s", req.Host)
345 }
346
347 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
348         st := FailHandler{
349                 make(chan string)}
350
351         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
352
353         UploadToStubHelper(c, st,
354                 func(kc *KeepClient, url string, reader io.ReadCloser,
355                         writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
356
357                         go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
358
359                         writer.Write([]byte("foo"))
360                         writer.Close()
361
362                         <-st.handled
363
364                         status := <-uploadStatusChan
365                         c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
366                         c.Check(status.statusCode, Equals, 500)
367                 })
368 }
369
370 type KeepServer struct {
371         listener net.Listener
372         url      string
373 }
374
375 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
376         ks = make([]KeepServer, n)
377
378         for i := 0; i < n; i++ {
379                 ks[i] = RunFakeKeepServer(st)
380         }
381
382         return ks
383 }
384
385 func (s *StandaloneSuite) TestPutB(c *C) {
386         hash := Md5String("foo")
387
388         st := &StubPutHandler{
389                 c:                    c,
390                 expectPath:           hash,
391                 expectAPIToken:       "abc123",
392                 expectBody:           "foo",
393                 expectStorageClass:   "",
394                 returnStorageClasses: "",
395                 handled:              make(chan string, 5),
396         }
397
398         arv, _ := arvadosclient.MakeArvadosClient()
399         kc, _ := MakeKeepClient(arv)
400
401         kc.Want_replicas = 2
402         arv.ApiToken = "abc123"
403         localRoots := make(map[string]string)
404         writableLocalRoots := make(map[string]string)
405
406         ks := RunSomeFakeKeepServers(st, 5)
407
408         for i, k := range ks {
409                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
410                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
411                 defer k.listener.Close()
412         }
413
414         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
415
416         kc.PutB([]byte("foo"))
417
418         shuff := NewRootSorter(
419                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
420
421         s1 := <-st.handled
422         s2 := <-st.handled
423         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
424                 (s1 == shuff[1] && s2 == shuff[0]),
425                 Equals,
426                 true)
427 }
428
429 func (s *StandaloneSuite) TestPutHR(c *C) {
430         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
431
432         st := &StubPutHandler{
433                 c:                    c,
434                 expectPath:           hash,
435                 expectAPIToken:       "abc123",
436                 expectBody:           "foo",
437                 expectStorageClass:   "",
438                 returnStorageClasses: "",
439                 handled:              make(chan string, 5),
440         }
441
442         arv, _ := arvadosclient.MakeArvadosClient()
443         kc, _ := MakeKeepClient(arv)
444
445         kc.Want_replicas = 2
446         arv.ApiToken = "abc123"
447         localRoots := make(map[string]string)
448         writableLocalRoots := make(map[string]string)
449
450         ks := RunSomeFakeKeepServers(st, 5)
451
452         for i, k := range ks {
453                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
454                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
455                 defer k.listener.Close()
456         }
457
458         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
459
460         reader, writer := io.Pipe()
461
462         go func() {
463                 writer.Write([]byte("foo"))
464                 writer.Close()
465         }()
466
467         kc.PutHR(hash, reader, 3)
468
469         shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
470
471         s1 := <-st.handled
472         s2 := <-st.handled
473
474         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
475                 (s1 == shuff[1] && s2 == shuff[0]),
476                 Equals,
477                 true)
478 }
479
480 func (s *StandaloneSuite) TestPutWithFail(c *C) {
481         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
482
483         st := &StubPutHandler{
484                 c:                    c,
485                 expectPath:           hash,
486                 expectAPIToken:       "abc123",
487                 expectBody:           "foo",
488                 expectStorageClass:   "",
489                 returnStorageClasses: "",
490                 handled:              make(chan string, 4),
491         }
492
493         fh := FailHandler{
494                 make(chan string, 1)}
495
496         arv, err := arvadosclient.MakeArvadosClient()
497         c.Check(err, IsNil)
498         kc, _ := MakeKeepClient(arv)
499
500         kc.Want_replicas = 2
501         arv.ApiToken = "abc123"
502         localRoots := make(map[string]string)
503         writableLocalRoots := make(map[string]string)
504
505         ks1 := RunSomeFakeKeepServers(st, 4)
506         ks2 := RunSomeFakeKeepServers(fh, 1)
507
508         for i, k := range ks1 {
509                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
510                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
511                 defer k.listener.Close()
512         }
513         for i, k := range ks2 {
514                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
515                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
516                 defer k.listener.Close()
517         }
518
519         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
520
521         shuff := NewRootSorter(
522                 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
523         c.Logf("%+v", shuff)
524
525         phash, replicas, err := kc.PutB([]byte("foo"))
526
527         <-fh.handled
528
529         c.Check(err, Equals, nil)
530         c.Check(phash, Equals, "")
531         c.Check(replicas, Equals, 2)
532
533         s1 := <-st.handled
534         s2 := <-st.handled
535
536         c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
537                 (s1 == shuff[2] && s2 == shuff[1]),
538                 Equals,
539                 true)
540 }
541
542 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
543         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
544
545         st := &StubPutHandler{
546                 c:                    c,
547                 expectPath:           hash,
548                 expectAPIToken:       "abc123",
549                 expectBody:           "foo",
550                 expectStorageClass:   "",
551                 returnStorageClasses: "",
552                 handled:              make(chan string, 1),
553         }
554
555         fh := FailHandler{
556                 make(chan string, 4)}
557
558         arv, err := arvadosclient.MakeArvadosClient()
559         c.Check(err, IsNil)
560         kc, _ := MakeKeepClient(arv)
561
562         kc.Want_replicas = 2
563         kc.Retries = 0
564         arv.ApiToken = "abc123"
565         localRoots := make(map[string]string)
566         writableLocalRoots := make(map[string]string)
567
568         ks1 := RunSomeFakeKeepServers(st, 1)
569         ks2 := RunSomeFakeKeepServers(fh, 4)
570
571         for i, k := range ks1 {
572                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
573                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
574                 defer k.listener.Close()
575         }
576         for i, k := range ks2 {
577                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
578                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
579                 defer k.listener.Close()
580         }
581
582         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
583
584         _, replicas, err := kc.PutB([]byte("foo"))
585
586         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
587         c.Check(replicas, Equals, 1)
588         c.Check(<-st.handled, Equals, ks1[0].url)
589 }
590
591 type StubGetHandler struct {
592         c              *C
593         expectPath     string
594         expectAPIToken string
595         httpStatus     int
596         body           []byte
597 }
598
599 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
600         sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
601         sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken))
602         resp.WriteHeader(sgh.httpStatus)
603         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
604         resp.Write(sgh.body)
605 }
606
607 func (s *StandaloneSuite) TestGet(c *C) {
608         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
609
610         st := StubGetHandler{
611                 c,
612                 hash,
613                 "abc123",
614                 http.StatusOK,
615                 []byte("foo")}
616
617         ks := RunFakeKeepServer(st)
618         defer ks.listener.Close()
619
620         arv, err := arvadosclient.MakeArvadosClient()
621         c.Check(err, IsNil)
622         kc, _ := MakeKeepClient(arv)
623         arv.ApiToken = "abc123"
624         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
625
626         r, n, url2, err := kc.Get(hash)
627         defer r.Close()
628         c.Check(err, Equals, nil)
629         c.Check(n, Equals, int64(3))
630         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
631
632         content, err2 := ioutil.ReadAll(r)
633         c.Check(err2, Equals, nil)
634         c.Check(content, DeepEquals, []byte("foo"))
635 }
636
637 func (s *StandaloneSuite) TestGet404(c *C) {
638         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
639
640         st := Error404Handler{make(chan string, 1)}
641
642         ks := RunFakeKeepServer(st)
643         defer ks.listener.Close()
644
645         arv, err := arvadosclient.MakeArvadosClient()
646         c.Check(err, IsNil)
647         kc, _ := MakeKeepClient(arv)
648         arv.ApiToken = "abc123"
649         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
650
651         r, n, url2, err := kc.Get(hash)
652         c.Check(err, Equals, BlockNotFound)
653         c.Check(n, Equals, int64(0))
654         c.Check(url2, Equals, "")
655         c.Check(r, Equals, nil)
656 }
657
658 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
659         st := Error404Handler{make(chan string, 1)}
660
661         ks := RunFakeKeepServer(st)
662         defer ks.listener.Close()
663
664         arv, err := arvadosclient.MakeArvadosClient()
665         c.Check(err, IsNil)
666         kc, _ := MakeKeepClient(arv)
667         arv.ApiToken = "abc123"
668         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
669
670         r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
671         c.Check(err, IsNil)
672         c.Check(n, Equals, int64(0))
673         c.Check(url2, Equals, "")
674         c.Assert(r, NotNil)
675         buf, err := ioutil.ReadAll(r)
676         c.Check(err, IsNil)
677         c.Check(buf, DeepEquals, []byte{})
678 }
679
680 func (s *StandaloneSuite) TestGetFail(c *C) {
681         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
682
683         st := FailHandler{make(chan string, 1)}
684
685         ks := RunFakeKeepServer(st)
686         defer ks.listener.Close()
687
688         arv, err := arvadosclient.MakeArvadosClient()
689         c.Check(err, IsNil)
690         kc, _ := MakeKeepClient(arv)
691         arv.ApiToken = "abc123"
692         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
693         kc.Retries = 0
694
695         r, n, url2, err := kc.Get(hash)
696         errNotFound, _ := err.(*ErrNotFound)
697         c.Check(errNotFound, NotNil)
698         c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
699         c.Check(errNotFound.Temporary(), Equals, true)
700         c.Check(n, Equals, int64(0))
701         c.Check(url2, Equals, "")
702         c.Check(r, Equals, nil)
703 }
704
705 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
706         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
707
708         st := &FailThenSucceedHandler{
709                 handled: make(chan string, 1),
710                 successhandler: StubGetHandler{
711                         c,
712                         hash,
713                         "abc123",
714                         http.StatusOK,
715                         []byte("foo")}}
716
717         ks := RunFakeKeepServer(st)
718         defer ks.listener.Close()
719
720         arv, err := arvadosclient.MakeArvadosClient()
721         c.Check(err, IsNil)
722         kc, _ := MakeKeepClient(arv)
723         arv.ApiToken = "abc123"
724         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
725
726         r, n, url2, err := kc.Get(hash)
727         defer r.Close()
728         c.Check(err, Equals, nil)
729         c.Check(n, Equals, int64(3))
730         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
731
732         content, err2 := ioutil.ReadAll(r)
733         c.Check(err2, Equals, nil)
734         c.Check(content, DeepEquals, []byte("foo"))
735
736         c.Logf("%q", st.reqIDs)
737         c.Assert(len(st.reqIDs) > 1, Equals, true)
738         for _, reqid := range st.reqIDs {
739                 c.Check(reqid, Not(Equals), "")
740                 c.Check(reqid, Equals, st.reqIDs[0])
741         }
742 }
743
744 func (s *StandaloneSuite) TestGetNetError(c *C) {
745         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
746
747         arv, err := arvadosclient.MakeArvadosClient()
748         c.Check(err, IsNil)
749         kc, _ := MakeKeepClient(arv)
750         arv.ApiToken = "abc123"
751         kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
752
753         r, n, url2, err := kc.Get(hash)
754         errNotFound, _ := err.(*ErrNotFound)
755         c.Check(errNotFound, NotNil)
756         c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
757         c.Check(errNotFound.Temporary(), Equals, true)
758         c.Check(n, Equals, int64(0))
759         c.Check(url2, Equals, "")
760         c.Check(r, Equals, nil)
761 }
762
763 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
764         uuid := "zzzzz-bi6l4-123451234512345"
765         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
766
767         // This one shouldn't be used:
768         ks0 := RunFakeKeepServer(StubGetHandler{
769                 c,
770                 "error if used",
771                 "abc123",
772                 http.StatusOK,
773                 []byte("foo")})
774         defer ks0.listener.Close()
775         // This one should be used:
776         ks := RunFakeKeepServer(StubGetHandler{
777                 c,
778                 hash + "+K@" + uuid,
779                 "abc123",
780                 http.StatusOK,
781                 []byte("foo")})
782         defer ks.listener.Close()
783
784         arv, err := arvadosclient.MakeArvadosClient()
785         c.Check(err, IsNil)
786         kc, _ := MakeKeepClient(arv)
787         arv.ApiToken = "abc123"
788         kc.SetServiceRoots(
789                 map[string]string{"x": ks0.url},
790                 nil,
791                 map[string]string{uuid: ks.url})
792
793         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
794         defer r.Close()
795         c.Check(err, Equals, nil)
796         c.Check(n, Equals, int64(3))
797         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
798
799         content, err := ioutil.ReadAll(r)
800         c.Check(err, Equals, nil)
801         c.Check(content, DeepEquals, []byte("foo"))
802 }
803
804 // Use a service hint to fetch from a local disk service, overriding
805 // rendezvous probe order.
806 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
807         uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
808         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
809
810         // This one shouldn't be used, although it appears first in
811         // rendezvous probe order:
812         ks0 := RunFakeKeepServer(StubGetHandler{
813                 c,
814                 "error if used",
815                 "abc123",
816                 http.StatusOK,
817                 []byte("foo")})
818         defer ks0.listener.Close()
819         // This one should be used:
820         ks := RunFakeKeepServer(StubGetHandler{
821                 c,
822                 hash + "+K@" + uuid,
823                 "abc123",
824                 http.StatusOK,
825                 []byte("foo")})
826         defer ks.listener.Close()
827
828         arv, err := arvadosclient.MakeArvadosClient()
829         c.Check(err, IsNil)
830         kc, _ := MakeKeepClient(arv)
831         arv.ApiToken = "abc123"
832         kc.SetServiceRoots(
833                 map[string]string{
834                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
835                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
836                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
837                         uuid:                          ks.url},
838                 nil,
839                 map[string]string{
840                         "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
841                         "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
842                         "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
843                         uuid:                          ks.url},
844         )
845
846         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
847         defer r.Close()
848         c.Check(err, Equals, nil)
849         c.Check(n, Equals, int64(3))
850         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
851
852         content, err := ioutil.ReadAll(r)
853         c.Check(err, Equals, nil)
854         c.Check(content, DeepEquals, []byte("foo"))
855 }
856
857 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
858         uuid := "zzzzz-bi6l4-123451234512345"
859         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
860
861         ksLocal := RunFakeKeepServer(StubGetHandler{
862                 c,
863                 hash + "+K@" + uuid,
864                 "abc123",
865                 http.StatusOK,
866                 []byte("foo")})
867         defer ksLocal.listener.Close()
868         ksGateway := RunFakeKeepServer(StubGetHandler{
869                 c,
870                 hash + "+K@" + uuid,
871                 "abc123",
872                 http.StatusInternalServerError,
873                 []byte("Error")})
874         defer ksGateway.listener.Close()
875
876         arv, err := arvadosclient.MakeArvadosClient()
877         c.Check(err, IsNil)
878         kc, _ := MakeKeepClient(arv)
879         arv.ApiToken = "abc123"
880         kc.SetServiceRoots(
881                 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
882                 nil,
883                 map[string]string{uuid: ksGateway.url})
884
885         r, n, uri, err := kc.Get(hash + "+K@" + uuid)
886         c.Assert(err, Equals, nil)
887         defer r.Close()
888         c.Check(n, Equals, int64(3))
889         c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
890
891         content, err := ioutil.ReadAll(r)
892         c.Check(err, Equals, nil)
893         c.Check(content, DeepEquals, []byte("foo"))
894 }
895
896 type BarHandler struct {
897         handled chan string
898 }
899
900 func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
901         resp.Write([]byte("bar"))
902         h.handled <- fmt.Sprintf("http://%s", req.Host)
903 }
904
905 func (s *StandaloneSuite) TestChecksum(c *C) {
906         foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
907         barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
908
909         st := BarHandler{make(chan string, 1)}
910
911         ks := RunFakeKeepServer(st)
912         defer ks.listener.Close()
913
914         arv, err := arvadosclient.MakeArvadosClient()
915         c.Check(err, IsNil)
916         kc, _ := MakeKeepClient(arv)
917         arv.ApiToken = "abc123"
918         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
919
920         r, n, _, err := kc.Get(barhash)
921         c.Check(err, IsNil)
922         _, err = ioutil.ReadAll(r)
923         c.Check(n, Equals, int64(3))
924         c.Check(err, Equals, nil)
925
926         <-st.handled
927
928         r, n, _, err = kc.Get(foohash)
929         c.Check(err, IsNil)
930         _, err = ioutil.ReadAll(r)
931         c.Check(n, Equals, int64(3))
932         c.Check(err, Equals, BadChecksum)
933
934         <-st.handled
935 }
936
937 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
938         content := []byte("waz")
939         hash := fmt.Sprintf("%x", md5.Sum(content))
940
941         fh := Error404Handler{
942                 make(chan string, 4)}
943
944         st := StubGetHandler{
945                 c,
946                 hash,
947                 "abc123",
948                 http.StatusOK,
949                 content}
950
951         arv, err := arvadosclient.MakeArvadosClient()
952         c.Check(err, IsNil)
953         kc, _ := MakeKeepClient(arv)
954         arv.ApiToken = "abc123"
955         localRoots := make(map[string]string)
956         writableLocalRoots := make(map[string]string)
957
958         ks1 := RunSomeFakeKeepServers(st, 1)
959         ks2 := RunSomeFakeKeepServers(fh, 4)
960
961         for i, k := range ks1 {
962                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
963                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
964                 defer k.listener.Close()
965         }
966         for i, k := range ks2 {
967                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
968                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
969                 defer k.listener.Close()
970         }
971
972         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
973         kc.Retries = 0
974
975         // This test works only if one of the failing services is
976         // attempted before the succeeding service. Otherwise,
977         // <-fh.handled below will just hang! (Probe order depends on
978         // the choice of block content "waz" and the UUIDs of the fake
979         // servers, so we just tried different strings until we found
980         // an example that passes this Assert.)
981         c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
982
983         r, n, url2, err := kc.Get(hash)
984
985         <-fh.handled
986         c.Check(err, Equals, nil)
987         c.Check(n, Equals, int64(3))
988         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
989
990         readContent, err2 := ioutil.ReadAll(r)
991         c.Check(err2, Equals, nil)
992         c.Check(readContent, DeepEquals, content)
993 }
994
995 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
996         content := []byte("TestPutGetHead")
997
998         arv, err := arvadosclient.MakeArvadosClient()
999         c.Check(err, IsNil)
1000         kc, err := MakeKeepClient(arv)
1001         c.Assert(err, Equals, nil)
1002
1003         hash := fmt.Sprintf("%x", md5.Sum(content))
1004
1005         {
1006                 n, _, err := kc.Ask(hash)
1007                 c.Check(err, Equals, BlockNotFound)
1008                 c.Check(n, Equals, int64(0))
1009         }
1010         {
1011                 hash2, replicas, err := kc.PutB(content)
1012                 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
1013                 c.Check(replicas, Equals, 2)
1014                 c.Check(err, Equals, nil)
1015         }
1016         {
1017                 r, n, url2, err := kc.Get(hash)
1018                 c.Check(err, Equals, nil)
1019                 c.Check(n, Equals, int64(len(content)))
1020                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
1021
1022                 readContent, err2 := ioutil.ReadAll(r)
1023                 c.Check(err2, Equals, nil)
1024                 c.Check(readContent, DeepEquals, content)
1025         }
1026         {
1027                 n, url2, err := kc.Ask(hash)
1028                 c.Check(err, Equals, nil)
1029                 c.Check(n, Equals, int64(len(content)))
1030                 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
1031         }
1032         {
1033                 loc, err := kc.LocalLocator(hash)
1034                 c.Check(err, Equals, nil)
1035                 c.Assert(len(loc) >= 32, Equals, true)
1036                 c.Check(loc[:32], Equals, hash[:32])
1037         }
1038         {
1039                 content := []byte("the perth county conspiracy")
1040                 loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
1041                 c.Check(loc, Equals, "")
1042                 c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
1043                 c.Check(err, ErrorMatches, `.*HTTP 400.*`)
1044         }
1045 }
1046
1047 type StubProxyHandler struct {
1048         handled chan string
1049 }
1050
1051 func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1052         resp.Header().Set("X-Keep-Replicas-Stored", "2")
1053         h.handled <- fmt.Sprintf("http://%s", req.Host)
1054 }
1055
1056 func (s *StandaloneSuite) TestPutProxy(c *C) {
1057         st := StubProxyHandler{make(chan string, 1)}
1058
1059         arv, err := arvadosclient.MakeArvadosClient()
1060         c.Check(err, IsNil)
1061         kc, _ := MakeKeepClient(arv)
1062
1063         kc.Want_replicas = 2
1064         arv.ApiToken = "abc123"
1065         localRoots := make(map[string]string)
1066         writableLocalRoots := make(map[string]string)
1067
1068         ks1 := RunSomeFakeKeepServers(st, 1)
1069
1070         for i, k := range ks1 {
1071                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1072                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1073                 defer k.listener.Close()
1074         }
1075
1076         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1077
1078         _, replicas, err := kc.PutB([]byte("foo"))
1079         <-st.handled
1080
1081         c.Check(err, Equals, nil)
1082         c.Check(replicas, Equals, 2)
1083 }
1084
1085 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
1086         st := StubProxyHandler{make(chan string, 1)}
1087
1088         arv, err := arvadosclient.MakeArvadosClient()
1089         c.Check(err, IsNil)
1090         kc, _ := MakeKeepClient(arv)
1091
1092         kc.Want_replicas = 3
1093         arv.ApiToken = "abc123"
1094         localRoots := make(map[string]string)
1095         writableLocalRoots := make(map[string]string)
1096
1097         ks1 := RunSomeFakeKeepServers(st, 1)
1098
1099         for i, k := range ks1 {
1100                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1101                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1102                 defer k.listener.Close()
1103         }
1104         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1105
1106         _, replicas, err := kc.PutB([]byte("foo"))
1107         <-st.handled
1108
1109         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1110         c.Check(replicas, Equals, 2)
1111 }
1112
1113 func (s *StandaloneSuite) TestMakeLocator(c *C) {
1114         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
1115         c.Check(err, Equals, nil)
1116         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1117         c.Check(l.Size, Equals, 3)
1118         c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
1119 }
1120
1121 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
1122         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
1123         c.Check(err, Equals, nil)
1124         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1125         c.Check(l.Size, Equals, -1)
1126         c.Check(l.Hints, DeepEquals, []string{})
1127 }
1128
1129 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
1130         l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
1131         c.Check(err, Equals, nil)
1132         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1133         c.Check(l.Size, Equals, -1)
1134         c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
1135 }
1136
1137 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
1138         str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
1139         l, err := MakeLocator(str)
1140         c.Check(err, Equals, nil)
1141         c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1142         c.Check(l.Size, Equals, 3)
1143         c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
1144         c.Check(l.String(), Equals, str)
1145 }
1146
1147 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
1148         _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
1149         c.Check(err, Equals, InvalidLocatorError)
1150 }
1151
1152 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
1153         hash := Md5String("foo")
1154
1155         st := &StubPutHandler{
1156                 c:                    c,
1157                 expectPath:           hash,
1158                 expectAPIToken:       "abc123",
1159                 expectBody:           "foo",
1160                 expectStorageClass:   "",
1161                 returnStorageClasses: "",
1162                 handled:              make(chan string, 5),
1163         }
1164
1165         arv, _ := arvadosclient.MakeArvadosClient()
1166         kc, _ := MakeKeepClient(arv)
1167
1168         kc.Want_replicas = 2
1169         arv.ApiToken = "abc123"
1170         localRoots := make(map[string]string)
1171         writableLocalRoots := make(map[string]string)
1172
1173         ks := RunSomeFakeKeepServers(st, 5)
1174
1175         for i, k := range ks {
1176                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1177                 if i == 0 {
1178                         writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1179                 }
1180                 defer k.listener.Close()
1181         }
1182
1183         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1184
1185         _, replicas, err := kc.PutB([]byte("foo"))
1186
1187         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1188         c.Check(replicas, Equals, 1)
1189
1190         c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1191 }
1192
1193 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1194         hash := Md5String("foo")
1195
1196         st := &StubPutHandler{
1197                 c:                    c,
1198                 expectPath:           hash,
1199                 expectAPIToken:       "abc123",
1200                 expectBody:           "foo",
1201                 expectStorageClass:   "",
1202                 returnStorageClasses: "",
1203                 handled:              make(chan string, 5),
1204         }
1205
1206         arv, _ := arvadosclient.MakeArvadosClient()
1207         kc, _ := MakeKeepClient(arv)
1208
1209         kc.Want_replicas = 2
1210         arv.ApiToken = "abc123"
1211         localRoots := make(map[string]string)
1212         writableLocalRoots := make(map[string]string)
1213
1214         ks := RunSomeFakeKeepServers(st, 5)
1215
1216         for i, k := range ks {
1217                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1218                 defer k.listener.Close()
1219         }
1220
1221         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1222
1223         _, replicas, err := kc.PutB([]byte("foo"))
1224
1225         c.Check(err, FitsTypeOf, InsufficientReplicasError{})
1226         c.Check(replicas, Equals, 0)
1227 }
1228
1229 type StubGetIndexHandler struct {
1230         c              *C
1231         expectPath     string
1232         expectAPIToken string
1233         httpStatus     int
1234         body           []byte
1235 }
1236
1237 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1238         h.c.Check(req.URL.Path, Equals, h.expectPath)
1239         h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1240         resp.WriteHeader(h.httpStatus)
1241         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1242         resp.Write(h.body)
1243 }
1244
1245 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1246         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1247
1248         st := StubGetIndexHandler{
1249                 c,
1250                 "/index",
1251                 "abc123",
1252                 http.StatusOK,
1253                 []byte(hash + "+3 1443559274\n\n")}
1254
1255         ks := RunFakeKeepServer(st)
1256         defer ks.listener.Close()
1257
1258         arv, err := arvadosclient.MakeArvadosClient()
1259         c.Assert(err, IsNil)
1260         kc, err := MakeKeepClient(arv)
1261         c.Assert(err, IsNil)
1262         arv.ApiToken = "abc123"
1263         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1264
1265         r, err := kc.GetIndex("x", "")
1266         c.Check(err, IsNil)
1267
1268         content, err2 := ioutil.ReadAll(r)
1269         c.Check(err2, Equals, nil)
1270         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1271 }
1272
1273 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1274         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1275
1276         st := StubGetIndexHandler{
1277                 c,
1278                 "/index/" + hash[0:3],
1279                 "abc123",
1280                 http.StatusOK,
1281                 []byte(hash + "+3 1443559274\n\n")}
1282
1283         ks := RunFakeKeepServer(st)
1284         defer ks.listener.Close()
1285
1286         arv, err := arvadosclient.MakeArvadosClient()
1287         c.Check(err, IsNil)
1288         kc, _ := MakeKeepClient(arv)
1289         arv.ApiToken = "abc123"
1290         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1291
1292         r, err := kc.GetIndex("x", hash[0:3])
1293         c.Assert(err, Equals, nil)
1294
1295         content, err2 := ioutil.ReadAll(r)
1296         c.Check(err2, Equals, nil)
1297         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1298 }
1299
1300 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1301         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1302
1303         st := StubGetIndexHandler{
1304                 c,
1305                 "/index/" + hash[0:3],
1306                 "abc123",
1307                 http.StatusOK,
1308                 []byte(hash)}
1309
1310         ks := RunFakeKeepServer(st)
1311         defer ks.listener.Close()
1312
1313         arv, err := arvadosclient.MakeArvadosClient()
1314         c.Check(err, IsNil)
1315         kc, _ := MakeKeepClient(arv)
1316         arv.ApiToken = "abc123"
1317         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1318
1319         _, err = kc.GetIndex("x", hash[0:3])
1320         c.Check(err, Equals, ErrIncompleteIndex)
1321 }
1322
1323 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1324         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1325
1326         st := StubGetIndexHandler{
1327                 c,
1328                 "/index/" + hash[0:3],
1329                 "abc123",
1330                 http.StatusOK,
1331                 []byte(hash)}
1332
1333         ks := RunFakeKeepServer(st)
1334         defer ks.listener.Close()
1335
1336         arv, err := arvadosclient.MakeArvadosClient()
1337         c.Check(err, IsNil)
1338         kc, _ := MakeKeepClient(arv)
1339         arv.ApiToken = "abc123"
1340         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1341
1342         _, err = kc.GetIndex("y", hash[0:3])
1343         c.Check(err, Equals, ErrNoSuchKeepServer)
1344 }
1345
1346 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1347         st := StubGetIndexHandler{
1348                 c,
1349                 "/index/abcd",
1350                 "abc123",
1351                 http.StatusOK,
1352                 []byte("\n")}
1353
1354         ks := RunFakeKeepServer(st)
1355         defer ks.listener.Close()
1356
1357         arv, err := arvadosclient.MakeArvadosClient()
1358         c.Check(err, IsNil)
1359         kc, _ := MakeKeepClient(arv)
1360         arv.ApiToken = "abc123"
1361         kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1362
1363         r, err := kc.GetIndex("x", "abcd")
1364         c.Check(err, Equals, nil)
1365
1366         content, err2 := ioutil.ReadAll(r)
1367         c.Check(err2, Equals, nil)
1368         c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1369 }
1370
1371 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1372         st := &FailThenSucceedHandler{
1373                 handled: make(chan string, 1),
1374                 successhandler: &StubPutHandler{
1375                         c:                    c,
1376                         expectPath:           Md5String("foo"),
1377                         expectAPIToken:       "abc123",
1378                         expectBody:           "foo",
1379                         expectStorageClass:   "",
1380                         returnStorageClasses: "",
1381                         handled:              make(chan string, 5),
1382                 },
1383         }
1384
1385         arv, _ := arvadosclient.MakeArvadosClient()
1386         kc, _ := MakeKeepClient(arv)
1387
1388         kc.Want_replicas = 2
1389         arv.ApiToken = "abc123"
1390         localRoots := make(map[string]string)
1391         writableLocalRoots := make(map[string]string)
1392
1393         ks := RunSomeFakeKeepServers(st, 2)
1394
1395         for i, k := range ks {
1396                 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1397                 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1398                 defer k.listener.Close()
1399         }
1400
1401         kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1402
1403         hash, replicas, err := kc.PutB([]byte("foo"))
1404
1405         c.Check(err, Equals, nil)
1406         c.Check(hash, Equals, "")
1407         c.Check(replicas, Equals, 2)
1408 }
1409
1410 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1411         arv, err := arvadosclient.MakeArvadosClient()
1412         c.Assert(err, Equals, nil)
1413
1414         // Add an additional "testblobstore" keepservice
1415         blobKeepService := make(arvadosclient.Dict)
1416         err = arv.Create("keep_services",
1417                 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1418                         "service_host": "localhost",
1419                         "service_port": "21321",
1420                         "service_type": "testblobstore"}},
1421                 &blobKeepService)
1422         c.Assert(err, Equals, nil)
1423         defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1424         RefreshServiceDiscovery()
1425
1426         // Make a keepclient and ensure that the testblobstore is included
1427         kc, err := MakeKeepClient(arv)
1428         c.Assert(err, Equals, nil)
1429
1430         // verify kc.LocalRoots
1431         c.Check(len(kc.LocalRoots()), Equals, 3)
1432         for _, root := range kc.LocalRoots() {
1433                 c.Check(root, Matches, "http://localhost:\\d+")
1434         }
1435         c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1436
1437         // verify kc.GatewayRoots
1438         c.Check(len(kc.GatewayRoots()), Equals, 3)
1439         for _, root := range kc.GatewayRoots() {
1440                 c.Check(root, Matches, "http://localhost:\\d+")
1441         }
1442         c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1443
1444         // verify kc.WritableLocalRoots
1445         c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1446         for _, root := range kc.WritableLocalRoots() {
1447                 c.Check(root, Matches, "http://localhost:\\d+")
1448         }
1449         c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1450
1451         c.Assert(kc.replicasPerService, Equals, 0)
1452         c.Assert(kc.foundNonDiskSvc, Equals, true)
1453         c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
1454 }