Merge branch 'github-pr-224'
[arvados.git] / services / keepproxy / keepproxy_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepproxy
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io/ioutil"
13         "math/rand"
14         "net"
15         "net/http"
16         "net/http/httptest"
17         "strings"
18         "sync"
19         "testing"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/config"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25         "git.arvados.org/arvados.git/sdk/go/arvadostest"
26         "git.arvados.org/arvados.git/sdk/go/ctxlog"
27         "git.arvados.org/arvados.git/sdk/go/httpserver"
28         "git.arvados.org/arvados.git/sdk/go/keepclient"
29         log "github.com/sirupsen/logrus"
30
31         "gopkg.in/check.v1"
32         . "gopkg.in/check.v1"
33 )
34
35 // Gocheck boilerplate
36 func Test(t *testing.T) {
37         TestingT(t)
38 }
39
40 // Gocheck boilerplate
41 var _ = Suite(&ServerRequiredSuite{})
42
43 // Tests that require the Keep server running
44 type ServerRequiredSuite struct{}
45
46 // Gocheck boilerplate
47 var _ = Suite(&ServerRequiredConfigYmlSuite{})
48
49 // Tests that require the Keep servers running as defined in config.yml
50 type ServerRequiredConfigYmlSuite struct{}
51
52 // Gocheck boilerplate
53 var _ = Suite(&NoKeepServerSuite{})
54
55 // Test with no keepserver to simulate errors
56 type NoKeepServerSuite struct{}
57
58 var TestProxyUUID = "zzzzz-bi6l4-lrixqc4fxofbmzz"
59
60 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
61         arvadostest.StartKeep(2, false)
62 }
63
64 func (s *ServerRequiredSuite) SetUpTest(c *C) {
65         arvadostest.ResetEnv()
66 }
67
68 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
69         arvadostest.StopKeep(2)
70 }
71
72 func (s *ServerRequiredConfigYmlSuite) SetUpSuite(c *C) {
73         // config.yml defines 4 keepstores
74         arvadostest.StartKeep(4, false)
75 }
76
77 func (s *ServerRequiredConfigYmlSuite) SetUpTest(c *C) {
78         arvadostest.ResetEnv()
79 }
80
81 func (s *ServerRequiredConfigYmlSuite) TearDownSuite(c *C) {
82         arvadostest.StopKeep(4)
83 }
84
85 func (s *NoKeepServerSuite) SetUpSuite(c *C) {
86         // We need API to have some keep services listed, but the
87         // services themselves should be unresponsive.
88         arvadostest.StartKeep(2, false)
89         arvadostest.StopKeep(2)
90 }
91
92 func (s *NoKeepServerSuite) SetUpTest(c *C) {
93         arvadostest.ResetEnv()
94 }
95
96 type testServer struct {
97         *httpserver.Server
98         proxyHandler *proxyHandler
99 }
100
101 func runProxy(c *C, bogusClientToken bool, loadKeepstoresFromConfig bool, kp *arvados.UploadDownloadRolePermissions) (*testServer, *keepclient.KeepClient, *bytes.Buffer) {
102         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
103         c.Assert(err, Equals, nil)
104         cluster, err := cfg.GetCluster("")
105         c.Assert(err, Equals, nil)
106
107         if !loadKeepstoresFromConfig {
108                 // Do not load Keepstore InternalURLs from the config file
109                 cluster.Services.Keepstore.InternalURLs = make(map[arvados.URL]arvados.ServiceInstance)
110         }
111
112         cluster.Services.Keepproxy.InternalURLs = map[arvados.URL]arvados.ServiceInstance{{Host: ":0"}: {}}
113
114         if kp != nil {
115                 cluster.Collections.KeepproxyPermission = *kp
116         }
117
118         logbuf := &bytes.Buffer{}
119         logger := log.New()
120         logger.Out = logbuf
121         ctx := ctxlog.Context(context.Background(), logger)
122
123         handler := newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, nil).(*proxyHandler)
124         srv := &testServer{
125                 Server: &httpserver.Server{
126                         Server: http.Server{
127                                 BaseContext: func(net.Listener) context.Context { return ctx },
128                                 Handler: httpserver.AddRequestIDs(
129                                         httpserver.LogRequests(handler)),
130                         },
131                         Addr: ":",
132                 },
133                 proxyHandler: handler,
134         }
135         err = srv.Start()
136         c.Assert(err, IsNil)
137
138         client := arvados.NewClientFromEnv()
139         arv, err := arvadosclient.New(client)
140         c.Assert(err, IsNil)
141         if bogusClientToken {
142                 arv.ApiToken = "bogus-token"
143         }
144         kc := keepclient.New(arv)
145         kc.DiskCacheSize = keepclient.DiskCacheDisabled
146         sr := map[string]string{
147                 TestProxyUUID: "http://" + srv.Addr,
148         }
149         kc.SetServiceRoots(sr, sr, sr)
150         return srv, kc, logbuf
151 }
152
153 func (s *ServerRequiredSuite) TestResponseViaHeader(c *C) {
154         srv, _, _ := runProxy(c, false, false, nil)
155         defer srv.Close()
156
157         req, err := http.NewRequest("POST",
158                 "http://"+srv.Addr+"/",
159                 strings.NewReader("TestViaHeader"))
160         c.Assert(err, Equals, nil)
161         req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
162         resp, err := (&http.Client{}).Do(req)
163         c.Assert(err, Equals, nil)
164         c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
165         c.Assert(resp.StatusCode, Equals, http.StatusOK)
166         locator, err := ioutil.ReadAll(resp.Body)
167         c.Assert(err, Equals, nil)
168         resp.Body.Close()
169
170         req, err = http.NewRequest("GET",
171                 "http://"+srv.Addr+"/"+string(locator),
172                 nil)
173         c.Assert(err, Equals, nil)
174         resp, err = (&http.Client{}).Do(req)
175         c.Assert(err, Equals, nil)
176         c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
177         resp.Body.Close()
178 }
179
180 func (s *ServerRequiredSuite) TestLoopDetection(c *C) {
181         srv, kc, _ := runProxy(c, false, false, nil)
182         defer srv.Close()
183
184         sr := map[string]string{
185                 TestProxyUUID: "http://" + srv.Addr,
186         }
187         srv.proxyHandler.KeepClient.SetServiceRoots(sr, sr, sr)
188
189         content := []byte("TestLoopDetection")
190         _, _, err := kc.PutB(content)
191         c.Check(err, ErrorMatches, `.*loop detected.*`)
192
193         hash := fmt.Sprintf("%x", md5.Sum(content))
194         _, _, _, err = kc.Get(hash)
195         c.Check(err, ErrorMatches, `.*loop detected.*`)
196 }
197
198 func (s *ServerRequiredSuite) TestStorageClassesHeader(c *C) {
199         srv, kc, _ := runProxy(c, false, false, nil)
200         defer srv.Close()
201
202         // Set up fake keepstore to record request headers
203         var hdr http.Header
204         ts := httptest.NewServer(http.HandlerFunc(
205                 func(w http.ResponseWriter, r *http.Request) {
206                         hdr = r.Header
207                         http.Error(w, "Error", http.StatusInternalServerError)
208                 }))
209         defer ts.Close()
210
211         // Point keepproxy router's keepclient to the fake keepstore
212         sr := map[string]string{
213                 TestProxyUUID: ts.URL,
214         }
215         srv.proxyHandler.KeepClient.SetServiceRoots(sr, sr, sr)
216
217         // Set up client to ask for storage classes to keepproxy
218         kc.StorageClasses = []string{"secure"}
219         content := []byte("Very important data")
220         _, _, err := kc.PutB(content)
221         c.Check(err, NotNil)
222         c.Check(hdr.Get("X-Keep-Storage-Classes"), Equals, "secure")
223 }
224
225 func (s *ServerRequiredSuite) TestStorageClassesConfirmedHeader(c *C) {
226         srv, _, _ := runProxy(c, false, false, nil)
227         defer srv.Close()
228
229         content := []byte("foo")
230         hash := fmt.Sprintf("%x", md5.Sum(content))
231         client := &http.Client{}
232
233         req, err := http.NewRequest("PUT",
234                 fmt.Sprintf("http://%s/%s", srv.Addr, hash),
235                 bytes.NewReader(content))
236         c.Assert(err, IsNil)
237         req.Header.Set("X-Keep-Storage-Classes", "default")
238         req.Header.Set("Authorization", "OAuth2 "+arvadostest.ActiveToken)
239         req.Header.Set("Content-Type", "application/octet-stream")
240
241         resp, err := client.Do(req)
242         c.Assert(err, IsNil)
243         c.Assert(resp.StatusCode, Equals, http.StatusOK)
244         c.Assert(resp.Header.Get("X-Keep-Storage-Classes-Confirmed"), Equals, "default=2")
245 }
246
247 func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) {
248         srv, kc, _ := runProxy(c, false, false, nil)
249         defer srv.Close()
250
251         content := []byte("TestDesiredReplicas")
252         hash := fmt.Sprintf("%x", md5.Sum(content))
253
254         for _, kc.Want_replicas = range []int{0, 1, 2, 3} {
255                 locator, rep, err := kc.PutB(content)
256                 if kc.Want_replicas < 3 {
257                         c.Check(err, Equals, nil)
258                         c.Check(rep, Equals, kc.Want_replicas)
259                         if rep > 0 {
260                                 c.Check(locator, Matches, fmt.Sprintf(`^%s\+%d(\+.+)?$`, hash, len(content)))
261                         }
262                 } else {
263                         c.Check(err, ErrorMatches, ".*503.*")
264                 }
265         }
266 }
267
268 func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
269         srv, kc, _ := runProxy(c, false, false, nil)
270         defer srv.Close()
271
272         content := []byte("TestPutWrongContentLength")
273         hash := fmt.Sprintf("%x", md5.Sum(content))
274
275         // If we use http.Client to send these requests to the network
276         // server we just started, the Go http library automatically
277         // fixes the invalid Content-Length header. In order to test
278         // our server behavior, we have to call the handler directly
279         // using an httptest.ResponseRecorder.
280         rtr, err := newHandler(context.Background(), kc, 10*time.Second, &arvados.Cluster{})
281         c.Assert(err, check.IsNil)
282
283         type testcase struct {
284                 sendLength   string
285                 expectStatus int
286         }
287
288         for _, t := range []testcase{
289                 {"1", http.StatusBadRequest},
290                 {"", http.StatusLengthRequired},
291                 {"-1", http.StatusLengthRequired},
292                 {"abcdef", http.StatusLengthRequired},
293         } {
294                 req, err := http.NewRequest("PUT",
295                         fmt.Sprintf("http://%s/%s+%d", srv.Addr, hash, len(content)),
296                         bytes.NewReader(content))
297                 c.Assert(err, IsNil)
298                 req.Header.Set("Content-Length", t.sendLength)
299                 req.Header.Set("Authorization", "OAuth2 "+arvadostest.ActiveToken)
300                 req.Header.Set("Content-Type", "application/octet-stream")
301
302                 resp := httptest.NewRecorder()
303                 rtr.ServeHTTP(resp, req)
304                 c.Check(resp.Code, Equals, t.expectStatus)
305         }
306 }
307
308 func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
309         srv, kc, _ := runProxy(c, false, false, nil)
310         defer srv.Close()
311         srv.proxyHandler.timeout = time.Nanosecond
312
313         buf := make([]byte, 1<<20)
314         rand.Read(buf)
315         var wg sync.WaitGroup
316         for i := 0; i < 128; i++ {
317                 wg.Add(1)
318                 go func() {
319                         defer wg.Done()
320                         kc.PutB(buf)
321                 }()
322         }
323         done := make(chan bool)
324         go func() {
325                 wg.Wait()
326                 close(done)
327         }()
328         select {
329         case <-done:
330         case <-time.After(10 * time.Second):
331                 c.Error("timeout")
332         }
333 }
334
335 func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
336         srv, kc, logbuf := runProxy(c, false, false, nil)
337         defer srv.Close()
338
339         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
340         var hash2 string
341
342         {
343                 _, _, err := kc.Ask(hash)
344                 c.Check(err, Equals, keepclient.BlockNotFound)
345                 c.Log("Finished Ask (expected BlockNotFound)")
346         }
347
348         {
349                 reader, _, _, err := kc.Get(hash)
350                 c.Check(reader, Equals, nil)
351                 c.Check(err, Equals, keepclient.BlockNotFound)
352                 c.Log("Finished Get (expected BlockNotFound)")
353         }
354
355         // Note in bug #5309 among other errors keepproxy would set
356         // Content-Length incorrectly on the 404 BlockNotFound response, this
357         // would result in a protocol violation that would prevent reuse of the
358         // connection, which would manifest by the next attempt to use the
359         // connection (in this case the PutB below) failing.  So to test for
360         // that bug it's necessary to trigger an error response (such as
361         // BlockNotFound) and then do something else with the same httpClient
362         // connection.
363
364         {
365                 var rep int
366                 var err error
367                 hash2, rep, err = kc.PutB([]byte("foo"))
368                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
369                 c.Check(rep, Equals, 2)
370                 c.Check(err, Equals, nil)
371                 c.Log("Finished PutB (expected success)")
372
373                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
374                 logbuf.Reset()
375         }
376
377         {
378                 blocklen, _, err := kc.Ask(hash2)
379                 c.Assert(err, Equals, nil)
380                 c.Check(blocklen, Equals, int64(3))
381                 c.Log("Finished Ask (expected success)")
382                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
383                 logbuf.Reset()
384         }
385
386         {
387                 reader, blocklen, _, err := kc.Get(hash2)
388                 c.Assert(err, Equals, nil)
389                 all, err := ioutil.ReadAll(reader)
390                 c.Check(err, IsNil)
391                 c.Check(all, DeepEquals, []byte("foo"))
392                 c.Check(blocklen, Equals, int64(3))
393                 c.Log("Finished Get (expected success)")
394                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
395                 logbuf.Reset()
396         }
397
398         {
399                 var rep int
400                 var err error
401                 hash2, rep, err = kc.PutB([]byte(""))
402                 c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
403                 c.Check(rep, Equals, 2)
404                 c.Check(err, Equals, nil)
405                 c.Log("Finished PutB zero block")
406         }
407
408         {
409                 reader, blocklen, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e")
410                 c.Assert(err, IsNil)
411                 all, err := ioutil.ReadAll(reader)
412                 c.Check(err, IsNil)
413                 c.Check(all, DeepEquals, []byte(""))
414                 c.Check(blocklen, Equals, int64(0))
415                 c.Log("Finished Get zero block")
416         }
417 }
418
419 func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
420         srv, kc, _ := runProxy(c, true, false, nil)
421         defer srv.Close()
422
423         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
424
425         _, _, err := kc.Ask(hash)
426         c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
427
428         hash2, rep, err := kc.PutB([]byte("bar"))
429         c.Check(hash2, Equals, "")
430         c.Check(rep, Equals, 0)
431         c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError{})
432
433         blocklen, _, err := kc.Ask(hash)
434         c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
435         c.Check(err, ErrorMatches, ".*HTTP 403.*")
436         c.Check(blocklen, Equals, int64(0))
437
438         _, blocklen, _, err = kc.Get(hash)
439         c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
440         c.Check(err, ErrorMatches, ".*HTTP 403.*")
441         c.Check(blocklen, Equals, int64(0))
442 }
443
444 func testPermission(c *C, admin bool, perm arvados.UploadDownloadPermission) {
445         kp := arvados.UploadDownloadRolePermissions{}
446         if admin {
447                 kp.Admin = perm
448                 kp.User = arvados.UploadDownloadPermission{Upload: true, Download: true}
449         } else {
450                 kp.Admin = arvados.UploadDownloadPermission{Upload: true, Download: true}
451                 kp.User = perm
452         }
453
454         srv, kc, logbuf := runProxy(c, false, false, &kp)
455         defer srv.Close()
456         if admin {
457                 kc.Arvados.ApiToken = arvadostest.AdminToken
458         } else {
459                 kc.Arvados.ApiToken = arvadostest.ActiveToken
460         }
461
462         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
463         var hash2 string
464
465         {
466                 var rep int
467                 var err error
468                 hash2, rep, err = kc.PutB([]byte("foo"))
469
470                 if perm.Upload {
471                         c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
472                         c.Check(rep, Equals, 2)
473                         c.Check(err, Equals, nil)
474                         c.Log("Finished PutB (expected success)")
475                         if admin {
476                                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
477                         } else {
478
479                                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="Active User".* userUUID=zzzzz-tpzed-xurymjxw79nv3jz.*`)
480                         }
481                 } else {
482                         c.Check(hash2, Equals, "")
483                         c.Check(rep, Equals, 0)
484                         c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError{})
485                 }
486                 logbuf.Reset()
487         }
488         if perm.Upload {
489                 // can't test download without upload.
490
491                 reader, blocklen, _, err := kc.Get(hash2)
492                 if perm.Download {
493                         c.Assert(err, Equals, nil)
494                         all, err := ioutil.ReadAll(reader)
495                         c.Check(err, IsNil)
496                         c.Check(all, DeepEquals, []byte("foo"))
497                         c.Check(blocklen, Equals, int64(3))
498                         c.Log("Finished Get (expected success)")
499                         if admin {
500                                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
501                         } else {
502                                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="Active User".* userUUID=zzzzz-tpzed-xurymjxw79nv3jz.*`)
503                         }
504                 } else {
505                         c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
506                         c.Check(err, ErrorMatches, ".*Missing or invalid Authorization header, or method not allowed.*")
507                         c.Check(blocklen, Equals, int64(0))
508                 }
509                 logbuf.Reset()
510         }
511
512 }
513
514 func (s *ServerRequiredSuite) TestPutGetPermission(c *C) {
515
516         for _, adminperm := range []bool{true, false} {
517                 for _, userperm := range []bool{true, false} {
518
519                         testPermission(c, true,
520                                 arvados.UploadDownloadPermission{
521                                         Upload:   adminperm,
522                                         Download: true,
523                                 })
524                         testPermission(c, true,
525                                 arvados.UploadDownloadPermission{
526                                         Upload:   true,
527                                         Download: adminperm,
528                                 })
529                         testPermission(c, false,
530                                 arvados.UploadDownloadPermission{
531                                         Upload:   true,
532                                         Download: userperm,
533                                 })
534                         testPermission(c, false,
535                                 arvados.UploadDownloadPermission{
536                                         Upload:   true,
537                                         Download: userperm,
538                                 })
539                 }
540         }
541 }
542
543 func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
544         srv, _, _ := runProxy(c, false, false, nil)
545         defer srv.Close()
546
547         {
548                 client := http.Client{}
549                 req, err := http.NewRequest("OPTIONS",
550                         fmt.Sprintf("http://%s/%x+3", srv.Addr, md5.Sum([]byte("foo"))),
551                         nil)
552                 c.Assert(err, IsNil)
553                 req.Header.Add("Access-Control-Request-Method", "PUT")
554                 req.Header.Add("Access-Control-Request-Headers", "Authorization, X-Keep-Desired-Replicas")
555                 resp, err := client.Do(req)
556                 c.Check(err, Equals, nil)
557                 c.Check(resp.StatusCode, Equals, 200)
558                 body, err := ioutil.ReadAll(resp.Body)
559                 c.Check(err, IsNil)
560                 c.Check(string(body), Equals, "")
561                 c.Check(resp.Header.Get("Access-Control-Allow-Methods"), Equals, "GET, HEAD, POST, PUT, OPTIONS")
562                 c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
563         }
564
565         {
566                 resp, err := http.Get(fmt.Sprintf("http://%s/%x+3", srv.Addr, md5.Sum([]byte("foo"))))
567                 c.Check(err, Equals, nil)
568                 c.Check(resp.Header.Get("Access-Control-Allow-Headers"), Equals, "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
569                 c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
570         }
571 }
572
573 func (s *ServerRequiredSuite) TestPostWithoutHash(c *C) {
574         srv, _, _ := runProxy(c, false, false, nil)
575         defer srv.Close()
576
577         {
578                 client := http.Client{}
579                 req, err := http.NewRequest("POST",
580                         "http://"+srv.Addr+"/",
581                         strings.NewReader("qux"))
582                 c.Check(err, IsNil)
583                 req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
584                 req.Header.Add("Content-Type", "application/octet-stream")
585                 resp, err := client.Do(req)
586                 c.Check(err, Equals, nil)
587                 body, err := ioutil.ReadAll(resp.Body)
588                 c.Check(err, Equals, nil)
589                 c.Check(string(body), Matches,
590                         fmt.Sprintf(`^%x\+3(\+.+)?$`, md5.Sum([]byte("qux"))))
591         }
592 }
593
594 func (s *ServerRequiredSuite) TestStripHint(c *C) {
595         c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73+K@zzzzz", "$1"),
596                 Equals,
597                 "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
598         c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73", "$1"),
599                 Equals,
600                 "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
601         c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz", "$1"),
602                 Equals,
603                 "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz")
604         c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73", "$1"),
605                 Equals,
606                 "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
607
608 }
609
610 // Test GetIndex
611 // - Put one block, with 2 replicas
612 // - With no prefix (expect the block locator, twice)
613 // - With an existing prefix (expect the block locator, twice)
614 // - With a valid but non-existing prefix (expect "\n")
615 // - With an invalid prefix (expect error)
616 func (s *ServerRequiredSuite) TestGetIndex(c *C) {
617         getIndexWorker(c, false)
618 }
619
620 // Test GetIndex
621 // - Uses config.yml
622 // - Put one block, with 2 replicas
623 // - With no prefix (expect the block locator, twice)
624 // - With an existing prefix (expect the block locator, twice)
625 // - With a valid but non-existing prefix (expect "\n")
626 // - With an invalid prefix (expect error)
627 func (s *ServerRequiredConfigYmlSuite) TestGetIndex(c *C) {
628         getIndexWorker(c, true)
629 }
630
631 func getIndexWorker(c *C, useConfig bool) {
632         srv, kc, _ := runProxy(c, false, useConfig, nil)
633         defer srv.Close()
634
635         // Put "index-data" blocks
636         data := []byte("index-data")
637         hash := fmt.Sprintf("%x", md5.Sum(data))
638
639         hash2, rep, err := kc.PutB(data)
640         c.Check(hash2, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, hash))
641         c.Check(rep, Equals, 2)
642         c.Check(err, Equals, nil)
643
644         reader, blocklen, _, err := kc.Get(hash2)
645         c.Assert(err, IsNil)
646         c.Check(blocklen, Equals, int64(10))
647         all, err := ioutil.ReadAll(reader)
648         c.Assert(err, IsNil)
649         c.Check(all, DeepEquals, data)
650
651         // Put some more blocks
652         _, _, err = kc.PutB([]byte("some-more-index-data"))
653         c.Check(err, IsNil)
654
655         kc.Arvados.ApiToken = arvadostest.SystemRootToken
656
657         // Invoke GetIndex
658         for _, spec := range []struct {
659                 prefix         string
660                 expectTestHash bool
661                 expectOther    bool
662         }{
663                 {"", true, true},         // with no prefix
664                 {hash[:3], true, false},  // with matching prefix
665                 {"abcdef", false, false}, // with no such prefix
666         } {
667                 indexReader, err := kc.GetIndex(TestProxyUUID, spec.prefix)
668                 c.Assert(err, Equals, nil)
669                 indexResp, err := ioutil.ReadAll(indexReader)
670                 c.Assert(err, Equals, nil)
671                 locators := strings.Split(string(indexResp), "\n")
672                 gotTestHash := 0
673                 gotOther := 0
674                 for _, locator := range locators {
675                         if locator == "" {
676                                 continue
677                         }
678                         c.Check(locator[:len(spec.prefix)], Equals, spec.prefix)
679                         if locator[:32] == hash {
680                                 gotTestHash++
681                         } else {
682                                 gotOther++
683                         }
684                 }
685                 c.Check(gotTestHash == 2, Equals, spec.expectTestHash)
686                 c.Check(gotOther > 0, Equals, spec.expectOther)
687         }
688
689         // GetIndex with invalid prefix
690         _, err = kc.GetIndex(TestProxyUUID, "xyz")
691         c.Assert((err != nil), Equals, true)
692 }
693
694 func (s *ServerRequiredSuite) TestCollectionSharingToken(c *C) {
695         srv, kc, _ := runProxy(c, false, false, nil)
696         defer srv.Close()
697         hash, _, err := kc.PutB([]byte("shareddata"))
698         c.Check(err, IsNil)
699         kc.Arvados.ApiToken = arvadostest.FooFileCollectionSharingToken
700         rdr, _, _, err := kc.Get(hash)
701         c.Assert(err, IsNil)
702         data, err := ioutil.ReadAll(rdr)
703         c.Check(err, IsNil)
704         c.Check(data, DeepEquals, []byte("shareddata"))
705 }
706
707 func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
708         srv, kc, _ := runProxy(c, false, false, nil)
709         defer srv.Close()
710
711         // Put a test block
712         hash, rep, err := kc.PutB([]byte("foo"))
713         c.Check(err, IsNil)
714         c.Check(rep, Equals, 2)
715
716         for _, badToken := range []string{
717                 "nosuchtoken",
718                 "2ym314ysp27sk7h943q6vtc378srb06se3pq6ghurylyf3pdmx", // expired
719         } {
720                 kc.Arvados.ApiToken = badToken
721
722                 // Ask and Get will fail only if the upstream
723                 // keepstore server checks for valid signatures.
724                 // Without knowing the blob signing key, there is no
725                 // way for keepproxy to know whether a given token is
726                 // permitted to read a block.  So these tests fail:
727                 if false {
728                         _, _, err = kc.Ask(hash)
729                         c.Assert(err, FitsTypeOf, &keepclient.ErrNotFound{})
730                         c.Check(err.(*keepclient.ErrNotFound).Temporary(), Equals, false)
731                         c.Check(err, ErrorMatches, ".*HTTP 403.*")
732
733                         _, _, _, err = kc.Get(hash)
734                         c.Assert(err, FitsTypeOf, &keepclient.ErrNotFound{})
735                         c.Check(err.(*keepclient.ErrNotFound).Temporary(), Equals, false)
736                         c.Check(err, ErrorMatches, ".*HTTP 403 \"Missing or invalid Authorization header, or method not allowed\".*")
737                 }
738
739                 _, _, err = kc.PutB([]byte("foo"))
740                 c.Check(err, ErrorMatches, ".*403.*Missing or invalid Authorization header, or method not allowed")
741         }
742 }
743
744 func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
745         srv, kc, _ := runProxy(c, false, false, nil)
746         defer srv.Close()
747
748         // Point keepproxy at a non-existent keepstore
749         locals := map[string]string{
750                 TestProxyUUID: "http://localhost:12345",
751         }
752         srv.proxyHandler.KeepClient.SetServiceRoots(locals, nil, nil)
753
754         // Ask should result in temporary bad gateway error
755         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
756         _, _, err := kc.Ask(hash)
757         c.Check(err, NotNil)
758         errNotFound, _ := err.(*keepclient.ErrNotFound)
759         c.Check(errNotFound.Temporary(), Equals, true)
760         c.Assert(err, ErrorMatches, ".*HTTP 502.*")
761
762         // Get should result in temporary bad gateway error
763         _, _, _, err = kc.Get(hash)
764         c.Check(err, NotNil)
765         errNotFound, _ = err.(*keepclient.ErrNotFound)
766         c.Check(errNotFound.Temporary(), Equals, true)
767         c.Assert(err, ErrorMatches, ".*HTTP 502.*")
768 }
769
770 func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
771         srv, kc, _ := runProxy(c, false, false, nil)
772         defer srv.Close()
773
774         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
775         for _, f := range []func() error{
776                 func() error {
777                         _, _, err := kc.Ask(hash)
778                         return err
779                 },
780                 func() error {
781                         _, _, _, err := kc.Get(hash)
782                         return err
783                 },
784         } {
785                 err := f()
786                 c.Check(err, NotNil)
787                 errNotFound, _ := err.(*keepclient.ErrNotFound)
788                 if c.Check(errNotFound, NotNil) {
789                         c.Check(errNotFound.Temporary(), Equals, true)
790                         c.Check(err, ErrorMatches, `.*HTTP 502.*`)
791                 }
792         }
793 }
794
795 func (s *ServerRequiredSuite) TestPing(c *C) {
796         srv, kc, _ := runProxy(c, false, false, nil)
797         defer srv.Close()
798
799         rtr, err := newHandler(context.Background(), kc, 10*time.Second, &arvados.Cluster{ManagementToken: arvadostest.ManagementToken})
800         c.Assert(err, check.IsNil)
801
802         req, err := http.NewRequest("GET",
803                 "http://"+srv.Addr+"/_health/ping",
804                 nil)
805         c.Assert(err, IsNil)
806         req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
807
808         resp := httptest.NewRecorder()
809         rtr.ServeHTTP(resp, req)
810         c.Check(resp.Code, Equals, 200)
811         c.Assert(resp.Body.String(), Matches, `{"health":"OK"}\n?`)
812 }