Merge branch '22132-scheduling-status-log' refs #22132
[arvados.git] / services / keepstore / router_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "sort"
18         "strings"
19         "time"
20
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/arvadostest"
23         "git.arvados.org/arvados.git/sdk/go/httpserver"
24         "github.com/prometheus/client_golang/prometheus"
25         . "gopkg.in/check.v1"
26 )
27
28 // routerSuite tests that the router correctly translates HTTP
29 // requests to the appropriate keepstore functionality, and translates
30 // the results to HTTP responses.
31 type routerSuite struct {
32         cluster *arvados.Cluster
33 }
34
35 var _ = Suite(&routerSuite{})
36
37 func testRouter(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*router, context.CancelFunc) {
38         if reg == nil {
39                 reg = prometheus.NewRegistry()
40         }
41         ctx, cancel := context.WithCancel(context.Background())
42         ks, kcancel := testKeepstore(t, cluster, reg)
43         go func() {
44                 <-ctx.Done()
45                 kcancel()
46         }()
47         puller := newPuller(ctx, ks, reg)
48         trasher := newTrasher(ctx, ks, reg)
49         return newRouter(ks, puller, trasher).(*router), cancel
50 }
51
52 func (s *routerSuite) SetUpTest(c *C) {
53         s.cluster = testCluster(c)
54         s.cluster.Volumes = map[string]arvados.Volume{
55                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"testclass1": true}},
56                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"testclass2": true}},
57         }
58         s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
59                 "testclass1": arvados.StorageClassConfig{
60                         Default: true,
61                 },
62                 "testclass2": arvados.StorageClassConfig{
63                         Default: true,
64                 },
65         }
66 }
67
68 func (s *routerSuite) TestBlockRead_Token(c *C) {
69         router, cancel := testRouter(c, s.cluster, nil)
70         defer cancel()
71
72         err := router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
73         c.Assert(err, IsNil)
74         locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3")
75         c.Assert(locSigned, Not(Equals), fooHash+"+3")
76
77         // No token provided
78         resp := call(router, "GET", "http://example/"+locSigned, "", nil, nil)
79         c.Check(resp.Code, Equals, http.StatusUnauthorized)
80         c.Check(resp.Body.String(), Matches, "no token provided in Authorization header\n")
81         checkCORSHeaders(c, resp.Header())
82
83         // Different token => invalid signature
84         resp = call(router, "GET", "http://example/"+locSigned, "badtoken", nil, nil)
85         c.Check(resp.Code, Equals, http.StatusBadRequest)
86         c.Check(resp.Body.String(), Equals, "invalid signature\n")
87         checkCORSHeaders(c, resp.Header())
88
89         // Correct token
90         resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
91         c.Check(resp.Code, Equals, http.StatusOK)
92         c.Check(resp.Body.String(), Equals, "foo")
93         checkCORSHeaders(c, resp.Header())
94
95         // HEAD
96         resp = call(router, "HEAD", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
97         c.Check(resp.Code, Equals, http.StatusOK)
98         c.Check(resp.Result().ContentLength, Equals, int64(3))
99         c.Check(resp.Body.String(), Equals, "")
100         checkCORSHeaders(c, resp.Header())
101 }
102
103 // Previous versions responded to "GET //locator" with a 301 redirect
104 // to "/locator".  To preserve compatibility with
105 // clients/configurations that depended on this, we now accept "GET
106 // //locator" as a synonym of "GET /locator", i.e., we respond with
107 // the data instead of a redirect.
108 //
109 // More generally, requests with double slashes are not accepted (see
110 // TestBadRequest).
111 func (s *routerSuite) TestBlockRead_DoubleSlash(c *C) {
112         router, cancel := testRouter(c, s.cluster, nil)
113         defer cancel()
114
115         err := router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
116         c.Assert(err, IsNil)
117         locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3")
118         c.Assert(locSigned, Not(Equals), fooHash+"+3")
119
120         resp := call(router, "GET", "http://example//"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
121         c.Check(resp.Code, Equals, http.StatusOK)
122         c.Check(resp.Body.String(), Equals, "foo")
123         checkCORSHeaders(c, resp.Header())
124
125         // HEAD
126         resp = call(router, "HEAD", "http://example//"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
127         c.Check(resp.Code, Equals, http.StatusOK)
128         c.Check(resp.Result().ContentLength, Equals, int64(3))
129         c.Check(resp.Body.String(), Equals, "")
130         checkCORSHeaders(c, resp.Header())
131 }
132
133 // As a special case we allow HEAD requests that only provide a hash
134 // without a size hint. This accommodates uses of keep-block-check
135 // where it's inconvenient to attach size hints to known hashes.
136 //
137 // GET requests must provide a size hint -- otherwise we can't
138 // propagate a checksum mismatch error.
139 func (s *routerSuite) TestBlockRead_NoSizeHint(c *C) {
140         s.cluster.Collections.BlobSigning = true
141         router, cancel := testRouter(c, s.cluster, nil)
142         defer cancel()
143         err := router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
144         c.Assert(err, IsNil)
145
146         // hash+signature
147         hashSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash)
148         resp := call(router, "GET", "http://example/"+hashSigned, arvadostest.ActiveTokenV2, nil, nil)
149         c.Check(resp.Code, Equals, http.StatusMethodNotAllowed)
150
151         resp = call(router, "HEAD", "http://example/"+fooHash, "", nil, nil)
152         c.Check(resp.Code, Equals, http.StatusUnauthorized)
153         resp = call(router, "HEAD", "http://example/"+fooHash+"+3", "", nil, nil)
154         c.Check(resp.Code, Equals, http.StatusUnauthorized)
155
156         s.cluster.Collections.BlobSigning = false
157         router, cancel = testRouter(c, s.cluster, nil)
158         defer cancel()
159         err = router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
160         c.Assert(err, IsNil)
161
162         resp = call(router, "GET", "http://example/"+fooHash, "", nil, nil)
163         c.Check(resp.Code, Equals, http.StatusMethodNotAllowed)
164
165         resp = call(router, "HEAD", "http://example/"+fooHash, "", nil, nil)
166         c.Check(resp.Code, Equals, http.StatusOK)
167         c.Check(resp.Body.String(), Equals, "")
168         c.Check(resp.Result().ContentLength, Equals, int64(3))
169         c.Check(resp.Header().Get("Content-Length"), Equals, "3")
170 }
171
172 // By the time we discover the checksum mismatch, it's too late to
173 // change the response code, but the expected block size is given in
174 // the Content-Length response header, so a generic http client can
175 // detect the problem.
176 func (s *routerSuite) TestBlockRead_ChecksumMismatch(c *C) {
177         router, cancel := testRouter(c, s.cluster, nil)
178         defer cancel()
179
180         gooddata := make([]byte, 10_000_000)
181         gooddata[0] = 'a'
182         hash := fmt.Sprintf("%x", md5.Sum(gooddata))
183         locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fmt.Sprintf("%s+%d", hash, len(gooddata)))
184
185         for _, baddata := range [][]byte{
186                 make([]byte, 3),
187                 make([]byte, len(gooddata)),
188                 make([]byte, len(gooddata)-1),
189                 make([]byte, len(gooddata)+1),
190                 make([]byte, len(gooddata)*2),
191         } {
192                 c.Logf("=== baddata len %d", len(baddata))
193                 err := router.keepstore.mountsW[0].BlockWrite(context.Background(), hash, baddata)
194                 c.Assert(err, IsNil)
195
196                 resp := call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
197                 if !c.Check(resp.Code, Equals, http.StatusOK) {
198                         c.Logf("resp.Body: %s", resp.Body.String())
199                 }
200                 c.Check(resp.Body.Len(), Not(Equals), len(gooddata))
201                 c.Check(resp.Result().ContentLength, Equals, int64(len(gooddata)))
202                 checkCORSHeaders(c, resp.Header())
203
204                 resp = call(router, "HEAD", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
205                 c.Check(resp.Code, Equals, http.StatusBadGateway)
206                 checkCORSHeaders(c, resp.Header())
207
208                 hashSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, hash)
209                 resp = call(router, "HEAD", "http://example/"+hashSigned, arvadostest.ActiveTokenV2, nil, nil)
210                 c.Check(resp.Code, Equals, http.StatusBadGateway)
211                 checkCORSHeaders(c, resp.Header())
212         }
213 }
214
215 func (s *routerSuite) TestBlockWrite(c *C) {
216         router, cancel := testRouter(c, s.cluster, nil)
217         defer cancel()
218
219         resp := call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), nil)
220         c.Check(resp.Code, Equals, http.StatusOK)
221         checkCORSHeaders(c, resp.Header())
222         locator := strings.TrimSpace(resp.Body.String())
223
224         resp = call(router, "GET", "http://example/"+locator, arvadostest.ActiveTokenV2, nil, nil)
225         c.Check(resp.Code, Equals, http.StatusOK)
226         c.Check(resp.Body.String(), Equals, "foo")
227 }
228
229 func (s *routerSuite) TestBlockWrite_Headers(c *C) {
230         router, cancel := testRouter(c, s.cluster, nil)
231         defer cancel()
232
233         resp := call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Desired-Replicas": []string{"2"}})
234         c.Check(resp.Code, Equals, http.StatusOK)
235         c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
236         c.Check(sortCommaSeparated(resp.Header().Get("X-Keep-Storage-Classes-Confirmed")), Equals, "testclass1=1")
237
238         resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{"testclass1"}})
239         c.Check(resp.Code, Equals, http.StatusOK)
240         c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
241         c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), Equals, "testclass1=1")
242
243         resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{" , testclass2 , "}})
244         c.Check(resp.Code, Equals, http.StatusOK)
245         c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
246         c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), Equals, "testclass2=1")
247
248         resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{"testclass1, testclass2"}})
249         c.Check(resp.Code, Equals, http.StatusOK)
250         c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "2")
251         confirmed := strings.Split(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), ", ")
252         sort.Strings(confirmed)
253         c.Check(confirmed, DeepEquals, []string{"testclass1=1", "testclass2=1"})
254 }
255
256 func sortCommaSeparated(s string) string {
257         slice := strings.Split(s, ", ")
258         sort.Strings(slice)
259         return strings.Join(slice, ", ")
260 }
261
262 func (s *routerSuite) TestBlockTouch(c *C) {
263         router, cancel := testRouter(c, s.cluster, nil)
264         defer cancel()
265
266         resp := call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
267         c.Check(resp.Code, Equals, http.StatusNotFound)
268
269         vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
270         err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
271         c.Assert(err, IsNil)
272         vol1 := router.keepstore.mountsW[1].volume.(*stubVolume)
273         err = vol1.BlockWrite(context.Background(), fooHash, []byte("foo"))
274         c.Assert(err, IsNil)
275
276         t1 := time.Now()
277         resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
278         c.Check(resp.Code, Equals, http.StatusOK)
279         t2 := time.Now()
280
281         // Unauthorized request is a no-op
282         resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", arvadostest.ActiveTokenV2, nil, nil)
283         c.Check(resp.Code, Equals, http.StatusForbidden)
284
285         // Volume 0 mtime should be updated
286         t, err := vol0.Mtime(fooHash)
287         c.Check(err, IsNil)
288         c.Check(t.After(t1), Equals, true)
289         c.Check(t.Before(t2), Equals, true)
290
291         // Volume 1 mtime should not be updated
292         t, err = vol1.Mtime(fooHash)
293         c.Check(err, IsNil)
294         c.Check(t.Before(t1), Equals, true)
295
296         err = vol0.BlockTrash(fooHash)
297         c.Assert(err, IsNil)
298         err = vol1.BlockTrash(fooHash)
299         c.Assert(err, IsNil)
300         resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
301         c.Check(resp.Code, Equals, http.StatusNotFound)
302 }
303
304 func (s *routerSuite) TestBlockTrash(c *C) {
305         router, cancel := testRouter(c, s.cluster, nil)
306         defer cancel()
307
308         vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
309         err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
310         c.Assert(err, IsNil)
311         err = vol0.blockTouchWithTime(fooHash, time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration()))
312         c.Assert(err, IsNil)
313         resp := call(router, "DELETE", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
314         c.Check(resp.Code, Equals, http.StatusOK)
315         c.Check(vol0.stubLog.String(), Matches, `(?ms).* trash .*`)
316         err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
317         c.Assert(err, Equals, os.ErrNotExist)
318 }
319
320 func (s *routerSuite) TestBlockUntrash(c *C) {
321         router, cancel := testRouter(c, s.cluster, nil)
322         defer cancel()
323
324         vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
325         err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
326         c.Assert(err, IsNil)
327         err = vol0.BlockTrash(fooHash)
328         c.Assert(err, IsNil)
329         err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
330         c.Assert(err, Equals, os.ErrNotExist)
331         resp := call(router, "PUT", "http://example/untrash/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
332         c.Check(resp.Code, Equals, http.StatusOK)
333         c.Check(vol0.stubLog.String(), Matches, `(?ms).* untrash .*`)
334         err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
335         c.Check(err, IsNil)
336 }
337
338 func (s *routerSuite) TestBadRequest(c *C) {
339         router, cancel := testRouter(c, s.cluster, nil)
340         defer cancel()
341
342         for _, trial := range []string{
343                 "GET /",
344                 "GET /xyz",
345                 "GET /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabcdefg",
346                 "GET /untrash",
347                 "GET /mounts/blocks/123",
348                 "GET /trash",
349                 "GET /pull",
350                 "GET /debug.json",  // old endpoint, no longer exists
351                 "GET /status.json", // old endpoint, no longer exists
352                 "GET //mounts",
353                 "GET //index",
354                 "PUT //aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
355                 "POST /",
356                 "POST /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
357                 "POST /trash",
358                 "PROPFIND /",
359                 "MAKE-COFFEE /",
360         } {
361                 c.Logf("=== %s", trial)
362                 methodpath := strings.Split(trial, " ")
363                 req := httptest.NewRequest(methodpath[0], "http://example"+methodpath[1], nil)
364                 resp := httptest.NewRecorder()
365                 router.ServeHTTP(resp, req)
366                 c.Check(resp.Code, Equals, http.StatusBadRequest)
367         }
368 }
369
370 func (s *routerSuite) TestRequireAdminMgtToken(c *C) {
371         router, cancel := testRouter(c, s.cluster, nil)
372         defer cancel()
373
374         for _, token := range []string{"badtoken", ""} {
375                 for _, trial := range []string{
376                         "PUT /pull",
377                         "PUT /trash",
378                         "GET /index",
379                         "GET /index/",
380                         "GET /index/1234",
381                         "PUT /untrash/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
382                 } {
383                         c.Logf("=== %s", trial)
384                         methodpath := strings.Split(trial, " ")
385                         req := httptest.NewRequest(methodpath[0], "http://example"+methodpath[1], nil)
386                         if token != "" {
387                                 req.Header.Set("Authorization", "Bearer "+token)
388                         }
389                         resp := httptest.NewRecorder()
390                         router.ServeHTTP(resp, req)
391                         if token == "" {
392                                 c.Check(resp.Code, Equals, http.StatusUnauthorized)
393                         } else {
394                                 c.Check(resp.Code, Equals, http.StatusForbidden)
395                         }
396                 }
397         }
398         req := httptest.NewRequest("TOUCH", "http://example/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", nil)
399         resp := httptest.NewRecorder()
400         router.ServeHTTP(resp, req)
401         c.Check(resp.Code, Equals, http.StatusUnauthorized)
402 }
403
404 func (s *routerSuite) TestVolumeErrorStatusCode(c *C) {
405         router, cancel := testRouter(c, s.cluster, nil)
406         defer cancel()
407         router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
408                 return httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
409         }
410
411         // To test whether we fall back to volume 1 after volume 0
412         // returns an error, we need to use a block whose rendezvous
413         // order has volume 0 first. Luckily "bar" is such a block.
414         c.Assert(router.keepstore.rendezvous(barHash, router.keepstore.mountsR)[0].UUID, DeepEquals, router.keepstore.mountsR[0].UUID)
415
416         locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, barHash+"+3")
417
418         // Volume 0 fails with an error that specifies an HTTP status
419         // code, so that code should be propagated to caller.
420         resp := call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
421         c.Check(resp.Code, Equals, http.StatusBadGateway)
422         c.Check(resp.Body.String(), Equals, "test error\n")
423
424         router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
425                 return errors.New("no http status provided")
426         }
427         resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
428         c.Check(resp.Code, Equals, http.StatusInternalServerError)
429         c.Check(resp.Body.String(), Equals, "no http status provided\n")
430
431         c.Assert(router.keepstore.mountsW[1].volume.BlockWrite(context.Background(), barHash, []byte("bar")), IsNil)
432
433         // If the requested block is available on the second volume,
434         // it doesn't matter that the first volume failed.
435         resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
436         c.Check(resp.Code, Equals, http.StatusOK)
437         c.Check(resp.Body.String(), Equals, "bar")
438 }
439
440 func (s *routerSuite) TestIndex(c *C) {
441         router, cancel := testRouter(c, s.cluster, nil)
442         defer cancel()
443
444         resp := call(router, "GET", "http://example/index", s.cluster.SystemRootToken, nil, nil)
445         c.Check(resp.Code, Equals, http.StatusOK)
446         c.Check(resp.Body.String(), Equals, "\n")
447
448         resp = call(router, "GET", "http://example/index?prefix=fff", s.cluster.SystemRootToken, nil, nil)
449         c.Check(resp.Code, Equals, http.StatusOK)
450         c.Check(resp.Body.String(), Equals, "\n")
451
452         t0 := time.Now().Add(-time.Hour)
453         vol0 := router.keepstore.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume)
454         err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
455         c.Assert(err, IsNil)
456         err = vol0.blockTouchWithTime(fooHash, t0)
457         c.Assert(err, IsNil)
458         err = vol0.BlockWrite(context.Background(), barHash, []byte("bar"))
459         c.Assert(err, IsNil)
460         err = vol0.blockTouchWithTime(barHash, t0)
461         c.Assert(err, IsNil)
462         t1 := time.Now().Add(-time.Minute)
463         vol1 := router.keepstore.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume)
464         err = vol1.BlockWrite(context.Background(), barHash, []byte("bar"))
465         c.Assert(err, IsNil)
466         err = vol1.blockTouchWithTime(barHash, t1)
467         c.Assert(err, IsNil)
468
469         for _, path := range []string{
470                 "/index?prefix=acb",
471                 "/index/acb",
472                 "/index/?prefix=acb",
473                 "/mounts/zzzzz-nyw5e-000000000000000/blocks?prefix=acb",
474                 "/mounts/zzzzz-nyw5e-000000000000000/blocks/?prefix=acb",
475                 "/mounts/zzzzz-nyw5e-000000000000000/blocks/acb",
476         } {
477                 c.Logf("=== %s", path)
478                 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
479                 c.Check(resp.Code, Equals, http.StatusOK)
480                 c.Check(resp.Body.String(), Equals, fooHash+"+3 "+fmt.Sprintf("%d", t0.UnixNano())+"\n\n")
481         }
482
483         for _, path := range []string{
484                 "/index?prefix=37",
485                 "/index/37",
486                 "/index/?prefix=37",
487         } {
488                 c.Logf("=== %s", path)
489                 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
490                 c.Check(resp.Code, Equals, http.StatusOK)
491                 c.Check(resp.Body.String(), Equals, ""+
492                         barHash+"+3 "+fmt.Sprintf("%d", t0.UnixNano())+"\n"+
493                         barHash+"+3 "+fmt.Sprintf("%d", t1.UnixNano())+"\n\n")
494         }
495
496         for _, path := range []string{
497                 "/mounts/zzzzz-nyw5e-111111111111111/blocks",
498                 "/mounts/zzzzz-nyw5e-111111111111111/blocks/",
499                 "/mounts/zzzzz-nyw5e-111111111111111/blocks?prefix=37",
500                 "/mounts/zzzzz-nyw5e-111111111111111/blocks/?prefix=37",
501                 "/mounts/zzzzz-nyw5e-111111111111111/blocks/37",
502         } {
503                 c.Logf("=== %s", path)
504                 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
505                 c.Check(resp.Code, Equals, http.StatusOK)
506                 c.Check(resp.Body.String(), Equals, barHash+"+3 "+fmt.Sprintf("%d", t1.UnixNano())+"\n\n")
507         }
508
509         for _, path := range []string{
510                 "/index",
511                 "/index?prefix=",
512                 "/index/",
513                 "/index/?prefix=",
514         } {
515                 c.Logf("=== %s", path)
516                 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
517                 c.Check(resp.Code, Equals, http.StatusOK)
518                 c.Check(strings.Split(resp.Body.String(), "\n"), HasLen, 5)
519         }
520 }
521
522 // Check that the context passed to a volume method gets cancelled
523 // when the http client hangs up.
524 func (s *routerSuite) TestCancelOnDisconnect(c *C) {
525         router, cancel := testRouter(c, s.cluster, nil)
526         defer cancel()
527
528         unblock := make(chan struct{})
529         router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.WriterAt) error {
530                 <-unblock
531                 c.Check(ctx.Err(), NotNil)
532                 return ctx.Err()
533         }
534         go func() {
535                 time.Sleep(time.Second / 10)
536                 cancel()
537                 close(unblock)
538         }()
539         locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3")
540         ctx, cancel := context.WithCancel(context.Background())
541         defer cancel()
542         req, err := http.NewRequestWithContext(ctx, "GET", "http://example/"+locSigned, nil)
543         c.Assert(err, IsNil)
544         req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2)
545         resp := httptest.NewRecorder()
546         router.ServeHTTP(resp, req)
547         c.Check(resp.Code, Equals, 499)
548 }
549
550 func (s *routerSuite) TestCORSPreflight(c *C) {
551         router, cancel := testRouter(c, s.cluster, nil)
552         defer cancel()
553
554         for _, path := range []string{"/", "/whatever", "/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+123"} {
555                 c.Logf("=== %s", path)
556                 resp := call(router, http.MethodOptions, "http://example"+path, arvadostest.ActiveTokenV2, nil, nil)
557                 c.Check(resp.Code, Equals, http.StatusOK)
558                 c.Check(resp.Body.String(), Equals, "")
559                 checkCORSHeaders(c, resp.Header())
560         }
561 }
562
563 func call(handler http.Handler, method, path, tok string, body []byte, hdr http.Header) *httptest.ResponseRecorder {
564         resp := httptest.NewRecorder()
565         req, err := http.NewRequest(method, path, bytes.NewReader(body))
566         if err != nil {
567                 panic(err)
568         }
569         for k := range hdr {
570                 req.Header.Set(k, hdr.Get(k))
571         }
572         if tok != "" {
573                 req.Header.Set("Authorization", "Bearer "+tok)
574         }
575         handler.ServeHTTP(resp, req)
576         return resp
577 }
578
579 func checkCORSHeaders(c *C, h http.Header) {
580         c.Check(h.Get("Access-Control-Allow-Methods"), Equals, "GET, HEAD, PUT, OPTIONS")
581         c.Check(h.Get("Access-Control-Allow-Origin"), Equals, "*")
582         c.Check(h.Get("Access-Control-Allow-Headers"), Equals, "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas, X-Keep-Signature, X-Keep-Storage-Classes")
583         c.Check(h.Get("Access-Control-Expose-Headers"), Equals, "X-Keep-Locator, X-Keep-Replicas-Stored, X-Keep-Storage-Classes-Confirmed")
584 }