1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.arvados.org/arvados.git/sdk/go/arvados"
22 "git.arvados.org/arvados.git/sdk/go/arvadostest"
23 "git.arvados.org/arvados.git/sdk/go/httpserver"
24 "github.com/prometheus/client_golang/prometheus"
28 // routerSuite tests that the router correctly translates HTTP
29 // requests to the appropriate keepstore functionality, and translates
30 // the results to HTTP responses.
31 type routerSuite struct {
32 cluster *arvados.Cluster
35 var _ = Suite(&routerSuite{})
37 func testRouter(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*router, context.CancelFunc) {
39 reg = prometheus.NewRegistry()
41 ctx, cancel := context.WithCancel(context.Background())
42 ks, kcancel := testKeepstore(t, cluster, reg)
47 puller := newPuller(ctx, ks, reg)
48 trasher := newTrasher(ctx, ks, reg)
49 return newRouter(ks, puller, trasher).(*router), cancel
52 func (s *routerSuite) SetUpTest(c *C) {
53 s.cluster = testCluster(c)
54 s.cluster.Volumes = map[string]arvados.Volume{
55 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"testclass1": true}},
56 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"testclass2": true}},
58 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
59 "testclass1": arvados.StorageClassConfig{
62 "testclass2": arvados.StorageClassConfig{
68 func (s *routerSuite) TestBlockRead_Token(c *C) {
69 router, cancel := testRouter(c, s.cluster, nil)
72 err := router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
74 locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3")
75 c.Assert(locSigned, Not(Equals), fooHash+"+3")
78 resp := call(router, "GET", "http://example/"+locSigned, "", nil, nil)
79 c.Check(resp.Code, Equals, http.StatusUnauthorized)
80 c.Check(resp.Body.String(), Matches, "no token provided in Authorization header\n")
82 // Different token => invalid signature
83 resp = call(router, "GET", "http://example/"+locSigned, "badtoken", nil, nil)
84 c.Check(resp.Code, Equals, http.StatusBadRequest)
85 c.Check(resp.Body.String(), Equals, "invalid signature\n")
88 resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
89 c.Check(resp.Code, Equals, http.StatusOK)
90 c.Check(resp.Body.String(), Equals, "foo")
93 resp = call(router, "HEAD", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
94 c.Check(resp.Code, Equals, http.StatusOK)
95 c.Check(resp.Result().ContentLength, Equals, int64(3))
96 c.Check(resp.Body.String(), Equals, "")
99 // As a special case we allow HEAD requests that only provide a hash
100 // without a size hint. This accommodates uses of keep-block-check
101 // where it's inconvenient to attach size hints to known hashes.
103 // GET requests must provide a size hint -- otherwise we can't
104 // propagate a checksum mismatch error.
105 func (s *routerSuite) TestBlockRead_NoSizeHint(c *C) {
106 s.cluster.Collections.BlobSigning = true
107 router, cancel := testRouter(c, s.cluster, nil)
109 err := router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
113 hashSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash)
114 resp := call(router, "GET", "http://example/"+hashSigned, arvadostest.ActiveTokenV2, nil, nil)
115 c.Check(resp.Code, Equals, http.StatusMethodNotAllowed)
117 resp = call(router, "HEAD", "http://example/"+fooHash, "", nil, nil)
118 c.Check(resp.Code, Equals, http.StatusUnauthorized)
119 resp = call(router, "HEAD", "http://example/"+fooHash+"+3", "", nil, nil)
120 c.Check(resp.Code, Equals, http.StatusUnauthorized)
122 s.cluster.Collections.BlobSigning = false
123 router, cancel = testRouter(c, s.cluster, nil)
125 err = router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
128 resp = call(router, "GET", "http://example/"+fooHash, "", nil, nil)
129 c.Check(resp.Code, Equals, http.StatusMethodNotAllowed)
131 resp = call(router, "HEAD", "http://example/"+fooHash, "", nil, nil)
132 c.Check(resp.Code, Equals, http.StatusOK)
133 c.Check(resp.Body.String(), Equals, "")
134 c.Check(resp.Result().ContentLength, Equals, int64(3))
135 c.Check(resp.Header().Get("Content-Length"), Equals, "3")
138 // By the time we discover the checksum mismatch, it's too late to
139 // change the response code, but the expected block size is given in
140 // the Content-Length response header, so a generic http client can
141 // detect the problem.
142 func (s *routerSuite) TestBlockRead_ChecksumMismatch(c *C) {
143 router, cancel := testRouter(c, s.cluster, nil)
146 gooddata := make([]byte, 10_000_000)
148 hash := fmt.Sprintf("%x", md5.Sum(gooddata))
149 locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fmt.Sprintf("%s+%d", hash, len(gooddata)))
151 for _, baddata := range [][]byte{
153 make([]byte, len(gooddata)),
154 make([]byte, len(gooddata)-1),
155 make([]byte, len(gooddata)+1),
156 make([]byte, len(gooddata)*2),
158 c.Logf("=== baddata len %d", len(baddata))
159 err := router.keepstore.mountsW[0].BlockWrite(context.Background(), hash, baddata)
162 resp := call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
163 if !c.Check(resp.Code, Equals, http.StatusOK) {
164 c.Logf("resp.Body: %s", resp.Body.String())
166 c.Check(resp.Body.Len(), Not(Equals), len(gooddata))
167 c.Check(resp.Result().ContentLength, Equals, int64(len(gooddata)))
169 resp = call(router, "HEAD", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
170 c.Check(resp.Code, Equals, http.StatusBadGateway)
172 hashSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, hash)
173 resp = call(router, "HEAD", "http://example/"+hashSigned, arvadostest.ActiveTokenV2, nil, nil)
174 c.Check(resp.Code, Equals, http.StatusBadGateway)
178 func (s *routerSuite) TestBlockWrite(c *C) {
179 router, cancel := testRouter(c, s.cluster, nil)
182 resp := call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), nil)
183 c.Check(resp.Code, Equals, http.StatusOK)
184 locator := strings.TrimSpace(resp.Body.String())
186 resp = call(router, "GET", "http://example/"+locator, arvadostest.ActiveTokenV2, nil, nil)
187 c.Check(resp.Code, Equals, http.StatusOK)
188 c.Check(resp.Body.String(), Equals, "foo")
191 func (s *routerSuite) TestBlockWrite_Headers(c *C) {
192 router, cancel := testRouter(c, s.cluster, nil)
195 resp := call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Arvados-Replicas-Desired": []string{"2"}})
196 c.Check(resp.Code, Equals, http.StatusOK)
197 c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
198 c.Check(sortCommaSeparated(resp.Header().Get("X-Keep-Storage-Classes-Confirmed")), Equals, "testclass1=1")
200 resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{"testclass1"}})
201 c.Check(resp.Code, Equals, http.StatusOK)
202 c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
203 c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), Equals, "testclass1=1")
205 resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{" , testclass2 , "}})
206 c.Check(resp.Code, Equals, http.StatusOK)
207 c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
208 c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), Equals, "testclass2=1")
211 func sortCommaSeparated(s string) string {
212 slice := strings.Split(s, ", ")
214 return strings.Join(slice, ", ")
217 func (s *routerSuite) TestBlockTouch(c *C) {
218 router, cancel := testRouter(c, s.cluster, nil)
221 resp := call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
222 c.Check(resp.Code, Equals, http.StatusNotFound)
224 vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
225 err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
227 vol1 := router.keepstore.mountsW[1].volume.(*stubVolume)
228 err = vol1.BlockWrite(context.Background(), fooHash, []byte("foo"))
232 resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
233 c.Check(resp.Code, Equals, http.StatusOK)
236 // Unauthorized request is a no-op
237 resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", arvadostest.ActiveTokenV2, nil, nil)
238 c.Check(resp.Code, Equals, http.StatusForbidden)
240 // Volume 0 mtime should be updated
241 t, err := vol0.Mtime(fooHash)
243 c.Check(t.After(t1), Equals, true)
244 c.Check(t.Before(t2), Equals, true)
246 // Volume 1 mtime should not be updated
247 t, err = vol1.Mtime(fooHash)
249 c.Check(t.Before(t1), Equals, true)
251 err = vol0.BlockTrash(fooHash)
253 err = vol1.BlockTrash(fooHash)
255 resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
256 c.Check(resp.Code, Equals, http.StatusNotFound)
259 func (s *routerSuite) TestBlockTrash(c *C) {
260 router, cancel := testRouter(c, s.cluster, nil)
263 vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
264 err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
266 err = vol0.blockTouchWithTime(fooHash, time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration()))
268 resp := call(router, "DELETE", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
269 c.Check(resp.Code, Equals, http.StatusOK)
270 c.Check(vol0.stubLog.String(), Matches, `(?ms).* trash .*`)
271 err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
272 c.Assert(err, Equals, os.ErrNotExist)
275 func (s *routerSuite) TestBlockUntrash(c *C) {
276 router, cancel := testRouter(c, s.cluster, nil)
279 vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
280 err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
282 err = vol0.BlockTrash(fooHash)
284 err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
285 c.Assert(err, Equals, os.ErrNotExist)
286 resp := call(router, "PUT", "http://example/untrash/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
287 c.Check(resp.Code, Equals, http.StatusOK)
288 c.Check(vol0.stubLog.String(), Matches, `(?ms).* untrash .*`)
289 err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
293 func (s *routerSuite) TestBadRequest(c *C) {
294 router, cancel := testRouter(c, s.cluster, nil)
297 for _, trial := range []string{
300 "GET /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabcdefg",
302 "GET /mounts/blocks/123",
305 "GET /debug.json", // old endpoint, no longer exists
306 "GET /status.json", // old endpoint, no longer exists
308 "POST /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
313 c.Logf("=== %s", trial)
314 methodpath := strings.Split(trial, " ")
315 req := httptest.NewRequest(methodpath[0], "http://example"+methodpath[1], nil)
316 resp := httptest.NewRecorder()
317 router.ServeHTTP(resp, req)
318 c.Check(resp.Code, Equals, http.StatusBadRequest)
322 func (s *routerSuite) TestRequireAdminMgtToken(c *C) {
323 router, cancel := testRouter(c, s.cluster, nil)
326 for _, token := range []string{"badtoken", ""} {
327 for _, trial := range []string{
333 "PUT /untrash/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
335 c.Logf("=== %s", trial)
336 methodpath := strings.Split(trial, " ")
337 req := httptest.NewRequest(methodpath[0], "http://example"+methodpath[1], nil)
339 req.Header.Set("Authorization", "Bearer "+token)
341 resp := httptest.NewRecorder()
342 router.ServeHTTP(resp, req)
344 c.Check(resp.Code, Equals, http.StatusUnauthorized)
346 c.Check(resp.Code, Equals, http.StatusForbidden)
350 req := httptest.NewRequest("TOUCH", "http://example/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", nil)
351 resp := httptest.NewRecorder()
352 router.ServeHTTP(resp, req)
353 c.Check(resp.Code, Equals, http.StatusUnauthorized)
356 func (s *routerSuite) TestVolumeErrorStatusCode(c *C) {
357 router, cancel := testRouter(c, s.cluster, nil)
359 router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
360 return httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
363 // To test whether we fall back to volume 1 after volume 0
364 // returns an error, we need to use a block whose rendezvous
365 // order has volume 0 first. Luckily "bar" is such a block.
366 c.Assert(router.keepstore.rendezvous(barHash, router.keepstore.mountsR)[0].UUID, DeepEquals, router.keepstore.mountsR[0].UUID)
368 locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, barHash+"+3")
370 // Volume 0 fails with an error that specifies an HTTP status
371 // code, so that code should be propagated to caller.
372 resp := call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
373 c.Check(resp.Code, Equals, http.StatusBadGateway)
374 c.Check(resp.Body.String(), Equals, "test error\n")
376 router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
377 return errors.New("no http status provided")
379 resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
380 c.Check(resp.Code, Equals, http.StatusInternalServerError)
381 c.Check(resp.Body.String(), Equals, "no http status provided\n")
383 c.Assert(router.keepstore.mountsW[1].volume.BlockWrite(context.Background(), barHash, []byte("bar")), IsNil)
385 // If the requested block is available on the second volume,
386 // it doesn't matter that the first volume failed.
387 resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
388 c.Check(resp.Code, Equals, http.StatusOK)
389 c.Check(resp.Body.String(), Equals, "bar")
392 func (s *routerSuite) TestIndex(c *C) {
393 router, cancel := testRouter(c, s.cluster, nil)
396 resp := call(router, "GET", "http://example/index", s.cluster.SystemRootToken, nil, nil)
397 c.Check(resp.Code, Equals, http.StatusOK)
398 c.Check(resp.Body.String(), Equals, "\n")
400 resp = call(router, "GET", "http://example/index?prefix=fff", s.cluster.SystemRootToken, nil, nil)
401 c.Check(resp.Code, Equals, http.StatusOK)
402 c.Check(resp.Body.String(), Equals, "\n")
404 t0 := time.Now().Add(-time.Hour)
405 vol0 := router.keepstore.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume)
406 err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
408 err = vol0.blockTouchWithTime(fooHash, t0)
410 err = vol0.BlockWrite(context.Background(), barHash, []byte("bar"))
412 err = vol0.blockTouchWithTime(barHash, t0)
414 t1 := time.Now().Add(-time.Minute)
415 vol1 := router.keepstore.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume)
416 err = vol1.BlockWrite(context.Background(), barHash, []byte("bar"))
418 err = vol1.blockTouchWithTime(barHash, t1)
421 for _, path := range []string{
424 "/index/?prefix=acb",
425 "/mounts/zzzzz-nyw5e-000000000000000/blocks?prefix=acb",
426 "/mounts/zzzzz-nyw5e-000000000000000/blocks/?prefix=acb",
427 "/mounts/zzzzz-nyw5e-000000000000000/blocks/acb",
429 c.Logf("=== %s", path)
430 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
431 c.Check(resp.Code, Equals, http.StatusOK)
432 c.Check(resp.Body.String(), Equals, fooHash+"+3 "+fmt.Sprintf("%d", t0.UnixNano())+"\n\n")
435 for _, path := range []string{
440 c.Logf("=== %s", path)
441 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
442 c.Check(resp.Code, Equals, http.StatusOK)
443 c.Check(resp.Body.String(), Equals, ""+
444 barHash+"+3 "+fmt.Sprintf("%d", t0.UnixNano())+"\n"+
445 barHash+"+3 "+fmt.Sprintf("%d", t1.UnixNano())+"\n\n")
448 for _, path := range []string{
449 "/mounts/zzzzz-nyw5e-111111111111111/blocks",
450 "/mounts/zzzzz-nyw5e-111111111111111/blocks/",
451 "/mounts/zzzzz-nyw5e-111111111111111/blocks?prefix=37",
452 "/mounts/zzzzz-nyw5e-111111111111111/blocks/?prefix=37",
453 "/mounts/zzzzz-nyw5e-111111111111111/blocks/37",
455 c.Logf("=== %s", path)
456 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
457 c.Check(resp.Code, Equals, http.StatusOK)
458 c.Check(resp.Body.String(), Equals, barHash+"+3 "+fmt.Sprintf("%d", t1.UnixNano())+"\n\n")
461 for _, path := range []string{
467 c.Logf("=== %s", path)
468 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
469 c.Check(resp.Code, Equals, http.StatusOK)
470 c.Check(strings.Split(resp.Body.String(), "\n"), HasLen, 5)
475 // Check that the context passed to a volume method gets cancelled
476 // when the http client hangs up.
477 func (s *routerSuite) TestCancelOnDisconnect(c *C) {
478 router, cancel := testRouter(c, s.cluster, nil)
481 unblock := make(chan struct{})
482 router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.WriterAt) error {
484 c.Check(ctx.Err(), NotNil)
488 time.Sleep(time.Second / 10)
492 locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3")
493 ctx, cancel := context.WithCancel(context.Background())
495 req, err := http.NewRequestWithContext(ctx, "GET", "http://example/"+locSigned, nil)
497 req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2)
498 resp := httptest.NewRecorder()
499 router.ServeHTTP(resp, req)
500 c.Check(resp.Code, Equals, 499)
503 func call(handler http.Handler, method, path, tok string, body []byte, hdr http.Header) *httptest.ResponseRecorder {
504 resp := httptest.NewRecorder()
505 req, err := http.NewRequest(method, path, bytes.NewReader(body))
510 req.Header.Set(k, hdr.Get(k))
513 req.Header.Set("Authorization", "Bearer "+tok)
515 handler.ServeHTTP(resp, req)