21943: Add test for files appearing multiple times in output
[arvados.git] / services / keepstore / router_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "net/http"
15         "net/http/httptest"
16         "os"
17         "sort"
18         "strings"
19         "time"
20
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/arvadostest"
23         "git.arvados.org/arvados.git/sdk/go/httpserver"
24         "github.com/prometheus/client_golang/prometheus"
25         . "gopkg.in/check.v1"
26 )
27
28 // routerSuite tests that the router correctly translates HTTP
29 // requests to the appropriate keepstore functionality, and translates
30 // the results to HTTP responses.
31 type routerSuite struct {
32         cluster *arvados.Cluster
33 }
34
35 var _ = Suite(&routerSuite{})
36
37 func testRouter(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*router, context.CancelFunc) {
38         if reg == nil {
39                 reg = prometheus.NewRegistry()
40         }
41         ctx, cancel := context.WithCancel(context.Background())
42         ks, kcancel := testKeepstore(t, cluster, reg)
43         go func() {
44                 <-ctx.Done()
45                 kcancel()
46         }()
47         puller := newPuller(ctx, ks, reg)
48         trasher := newTrasher(ctx, ks, reg)
49         return newRouter(ks, puller, trasher).(*router), cancel
50 }
51
52 func (s *routerSuite) SetUpTest(c *C) {
53         s.cluster = testCluster(c)
54         s.cluster.Volumes = map[string]arvados.Volume{
55                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"testclass1": true}},
56                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"testclass2": true}},
57         }
58         s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
59                 "testclass1": arvados.StorageClassConfig{
60                         Default: true,
61                 },
62                 "testclass2": arvados.StorageClassConfig{
63                         Default: true,
64                 },
65         }
66 }
67
68 func (s *routerSuite) TestBlockRead_Token(c *C) {
69         router, cancel := testRouter(c, s.cluster, nil)
70         defer cancel()
71
72         err := router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
73         c.Assert(err, IsNil)
74         locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3")
75         c.Assert(locSigned, Not(Equals), fooHash+"+3")
76
77         // No token provided
78         resp := call(router, "GET", "http://example/"+locSigned, "", nil, nil)
79         c.Check(resp.Code, Equals, http.StatusUnauthorized)
80         c.Check(resp.Body.String(), Matches, "no token provided in Authorization header\n")
81         checkCORSHeaders(c, resp.Header())
82
83         // Different token => invalid signature
84         resp = call(router, "GET", "http://example/"+locSigned, "badtoken", nil, nil)
85         c.Check(resp.Code, Equals, http.StatusBadRequest)
86         c.Check(resp.Body.String(), Equals, "invalid signature\n")
87         checkCORSHeaders(c, resp.Header())
88
89         // Correct token
90         resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
91         c.Check(resp.Code, Equals, http.StatusOK)
92         c.Check(resp.Body.String(), Equals, "foo")
93         checkCORSHeaders(c, resp.Header())
94
95         // HEAD
96         resp = call(router, "HEAD", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
97         c.Check(resp.Code, Equals, http.StatusOK)
98         c.Check(resp.Result().ContentLength, Equals, int64(3))
99         c.Check(resp.Body.String(), Equals, "")
100         checkCORSHeaders(c, resp.Header())
101 }
102
103 // As a special case we allow HEAD requests that only provide a hash
104 // without a size hint. This accommodates uses of keep-block-check
105 // where it's inconvenient to attach size hints to known hashes.
106 //
107 // GET requests must provide a size hint -- otherwise we can't
108 // propagate a checksum mismatch error.
109 func (s *routerSuite) TestBlockRead_NoSizeHint(c *C) {
110         s.cluster.Collections.BlobSigning = true
111         router, cancel := testRouter(c, s.cluster, nil)
112         defer cancel()
113         err := router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
114         c.Assert(err, IsNil)
115
116         // hash+signature
117         hashSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash)
118         resp := call(router, "GET", "http://example/"+hashSigned, arvadostest.ActiveTokenV2, nil, nil)
119         c.Check(resp.Code, Equals, http.StatusMethodNotAllowed)
120
121         resp = call(router, "HEAD", "http://example/"+fooHash, "", nil, nil)
122         c.Check(resp.Code, Equals, http.StatusUnauthorized)
123         resp = call(router, "HEAD", "http://example/"+fooHash+"+3", "", nil, nil)
124         c.Check(resp.Code, Equals, http.StatusUnauthorized)
125
126         s.cluster.Collections.BlobSigning = false
127         router, cancel = testRouter(c, s.cluster, nil)
128         defer cancel()
129         err = router.keepstore.mountsW[0].BlockWrite(context.Background(), fooHash, []byte("foo"))
130         c.Assert(err, IsNil)
131
132         resp = call(router, "GET", "http://example/"+fooHash, "", nil, nil)
133         c.Check(resp.Code, Equals, http.StatusMethodNotAllowed)
134
135         resp = call(router, "HEAD", "http://example/"+fooHash, "", nil, nil)
136         c.Check(resp.Code, Equals, http.StatusOK)
137         c.Check(resp.Body.String(), Equals, "")
138         c.Check(resp.Result().ContentLength, Equals, int64(3))
139         c.Check(resp.Header().Get("Content-Length"), Equals, "3")
140 }
141
142 // By the time we discover the checksum mismatch, it's too late to
143 // change the response code, but the expected block size is given in
144 // the Content-Length response header, so a generic http client can
145 // detect the problem.
146 func (s *routerSuite) TestBlockRead_ChecksumMismatch(c *C) {
147         router, cancel := testRouter(c, s.cluster, nil)
148         defer cancel()
149
150         gooddata := make([]byte, 10_000_000)
151         gooddata[0] = 'a'
152         hash := fmt.Sprintf("%x", md5.Sum(gooddata))
153         locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fmt.Sprintf("%s+%d", hash, len(gooddata)))
154
155         for _, baddata := range [][]byte{
156                 make([]byte, 3),
157                 make([]byte, len(gooddata)),
158                 make([]byte, len(gooddata)-1),
159                 make([]byte, len(gooddata)+1),
160                 make([]byte, len(gooddata)*2),
161         } {
162                 c.Logf("=== baddata len %d", len(baddata))
163                 err := router.keepstore.mountsW[0].BlockWrite(context.Background(), hash, baddata)
164                 c.Assert(err, IsNil)
165
166                 resp := call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
167                 if !c.Check(resp.Code, Equals, http.StatusOK) {
168                         c.Logf("resp.Body: %s", resp.Body.String())
169                 }
170                 c.Check(resp.Body.Len(), Not(Equals), len(gooddata))
171                 c.Check(resp.Result().ContentLength, Equals, int64(len(gooddata)))
172                 checkCORSHeaders(c, resp.Header())
173
174                 resp = call(router, "HEAD", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
175                 c.Check(resp.Code, Equals, http.StatusBadGateway)
176                 checkCORSHeaders(c, resp.Header())
177
178                 hashSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, hash)
179                 resp = call(router, "HEAD", "http://example/"+hashSigned, arvadostest.ActiveTokenV2, nil, nil)
180                 c.Check(resp.Code, Equals, http.StatusBadGateway)
181                 checkCORSHeaders(c, resp.Header())
182         }
183 }
184
185 func (s *routerSuite) TestBlockWrite(c *C) {
186         router, cancel := testRouter(c, s.cluster, nil)
187         defer cancel()
188
189         resp := call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), nil)
190         c.Check(resp.Code, Equals, http.StatusOK)
191         checkCORSHeaders(c, resp.Header())
192         locator := strings.TrimSpace(resp.Body.String())
193
194         resp = call(router, "GET", "http://example/"+locator, arvadostest.ActiveTokenV2, nil, nil)
195         c.Check(resp.Code, Equals, http.StatusOK)
196         c.Check(resp.Body.String(), Equals, "foo")
197 }
198
199 func (s *routerSuite) TestBlockWrite_Headers(c *C) {
200         router, cancel := testRouter(c, s.cluster, nil)
201         defer cancel()
202
203         resp := call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Desired-Replicas": []string{"2"}})
204         c.Check(resp.Code, Equals, http.StatusOK)
205         c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
206         c.Check(sortCommaSeparated(resp.Header().Get("X-Keep-Storage-Classes-Confirmed")), Equals, "testclass1=1")
207
208         resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{"testclass1"}})
209         c.Check(resp.Code, Equals, http.StatusOK)
210         c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
211         c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), Equals, "testclass1=1")
212
213         resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{" , testclass2 , "}})
214         c.Check(resp.Code, Equals, http.StatusOK)
215         c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "1")
216         c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), Equals, "testclass2=1")
217
218         resp = call(router, "PUT", "http://example/"+fooHash, arvadostest.ActiveTokenV2, []byte("foo"), http.Header{"X-Keep-Storage-Classes": []string{"testclass1, testclass2"}})
219         c.Check(resp.Code, Equals, http.StatusOK)
220         c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), Equals, "2")
221         confirmed := strings.Split(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), ", ")
222         sort.Strings(confirmed)
223         c.Check(confirmed, DeepEquals, []string{"testclass1=1", "testclass2=1"})
224 }
225
226 func sortCommaSeparated(s string) string {
227         slice := strings.Split(s, ", ")
228         sort.Strings(slice)
229         return strings.Join(slice, ", ")
230 }
231
232 func (s *routerSuite) TestBlockTouch(c *C) {
233         router, cancel := testRouter(c, s.cluster, nil)
234         defer cancel()
235
236         resp := call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
237         c.Check(resp.Code, Equals, http.StatusNotFound)
238
239         vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
240         err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
241         c.Assert(err, IsNil)
242         vol1 := router.keepstore.mountsW[1].volume.(*stubVolume)
243         err = vol1.BlockWrite(context.Background(), fooHash, []byte("foo"))
244         c.Assert(err, IsNil)
245
246         t1 := time.Now()
247         resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
248         c.Check(resp.Code, Equals, http.StatusOK)
249         t2 := time.Now()
250
251         // Unauthorized request is a no-op
252         resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", arvadostest.ActiveTokenV2, nil, nil)
253         c.Check(resp.Code, Equals, http.StatusForbidden)
254
255         // Volume 0 mtime should be updated
256         t, err := vol0.Mtime(fooHash)
257         c.Check(err, IsNil)
258         c.Check(t.After(t1), Equals, true)
259         c.Check(t.Before(t2), Equals, true)
260
261         // Volume 1 mtime should not be updated
262         t, err = vol1.Mtime(fooHash)
263         c.Check(err, IsNil)
264         c.Check(t.Before(t1), Equals, true)
265
266         err = vol0.BlockTrash(fooHash)
267         c.Assert(err, IsNil)
268         err = vol1.BlockTrash(fooHash)
269         c.Assert(err, IsNil)
270         resp = call(router, "TOUCH", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
271         c.Check(resp.Code, Equals, http.StatusNotFound)
272 }
273
274 func (s *routerSuite) TestBlockTrash(c *C) {
275         router, cancel := testRouter(c, s.cluster, nil)
276         defer cancel()
277
278         vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
279         err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
280         c.Assert(err, IsNil)
281         err = vol0.blockTouchWithTime(fooHash, time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration()))
282         c.Assert(err, IsNil)
283         resp := call(router, "DELETE", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
284         c.Check(resp.Code, Equals, http.StatusOK)
285         c.Check(vol0.stubLog.String(), Matches, `(?ms).* trash .*`)
286         err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
287         c.Assert(err, Equals, os.ErrNotExist)
288 }
289
290 func (s *routerSuite) TestBlockUntrash(c *C) {
291         router, cancel := testRouter(c, s.cluster, nil)
292         defer cancel()
293
294         vol0 := router.keepstore.mountsW[0].volume.(*stubVolume)
295         err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
296         c.Assert(err, IsNil)
297         err = vol0.BlockTrash(fooHash)
298         c.Assert(err, IsNil)
299         err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
300         c.Assert(err, Equals, os.ErrNotExist)
301         resp := call(router, "PUT", "http://example/untrash/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
302         c.Check(resp.Code, Equals, http.StatusOK)
303         c.Check(vol0.stubLog.String(), Matches, `(?ms).* untrash .*`)
304         err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
305         c.Check(err, IsNil)
306 }
307
308 func (s *routerSuite) TestBadRequest(c *C) {
309         router, cancel := testRouter(c, s.cluster, nil)
310         defer cancel()
311
312         for _, trial := range []string{
313                 "GET /",
314                 "GET /xyz",
315                 "GET /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabcdefg",
316                 "GET /untrash",
317                 "GET /mounts/blocks/123",
318                 "GET /trash",
319                 "GET /pull",
320                 "GET /debug.json",  // old endpoint, no longer exists
321                 "GET /status.json", // old endpoint, no longer exists
322                 "POST /",
323                 "POST /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
324                 "POST /trash",
325                 "PROPFIND /",
326                 "MAKE-COFFEE /",
327         } {
328                 c.Logf("=== %s", trial)
329                 methodpath := strings.Split(trial, " ")
330                 req := httptest.NewRequest(methodpath[0], "http://example"+methodpath[1], nil)
331                 resp := httptest.NewRecorder()
332                 router.ServeHTTP(resp, req)
333                 c.Check(resp.Code, Equals, http.StatusBadRequest)
334         }
335 }
336
337 func (s *routerSuite) TestRequireAdminMgtToken(c *C) {
338         router, cancel := testRouter(c, s.cluster, nil)
339         defer cancel()
340
341         for _, token := range []string{"badtoken", ""} {
342                 for _, trial := range []string{
343                         "PUT /pull",
344                         "PUT /trash",
345                         "GET /index",
346                         "GET /index/",
347                         "GET /index/1234",
348                         "PUT /untrash/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
349                 } {
350                         c.Logf("=== %s", trial)
351                         methodpath := strings.Split(trial, " ")
352                         req := httptest.NewRequest(methodpath[0], "http://example"+methodpath[1], nil)
353                         if token != "" {
354                                 req.Header.Set("Authorization", "Bearer "+token)
355                         }
356                         resp := httptest.NewRecorder()
357                         router.ServeHTTP(resp, req)
358                         if token == "" {
359                                 c.Check(resp.Code, Equals, http.StatusUnauthorized)
360                         } else {
361                                 c.Check(resp.Code, Equals, http.StatusForbidden)
362                         }
363                 }
364         }
365         req := httptest.NewRequest("TOUCH", "http://example/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", nil)
366         resp := httptest.NewRecorder()
367         router.ServeHTTP(resp, req)
368         c.Check(resp.Code, Equals, http.StatusUnauthorized)
369 }
370
371 func (s *routerSuite) TestVolumeErrorStatusCode(c *C) {
372         router, cancel := testRouter(c, s.cluster, nil)
373         defer cancel()
374         router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
375                 return httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
376         }
377
378         // To test whether we fall back to volume 1 after volume 0
379         // returns an error, we need to use a block whose rendezvous
380         // order has volume 0 first. Luckily "bar" is such a block.
381         c.Assert(router.keepstore.rendezvous(barHash, router.keepstore.mountsR)[0].UUID, DeepEquals, router.keepstore.mountsR[0].UUID)
382
383         locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, barHash+"+3")
384
385         // Volume 0 fails with an error that specifies an HTTP status
386         // code, so that code should be propagated to caller.
387         resp := call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
388         c.Check(resp.Code, Equals, http.StatusBadGateway)
389         c.Check(resp.Body.String(), Equals, "test error\n")
390
391         router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
392                 return errors.New("no http status provided")
393         }
394         resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
395         c.Check(resp.Code, Equals, http.StatusInternalServerError)
396         c.Check(resp.Body.String(), Equals, "no http status provided\n")
397
398         c.Assert(router.keepstore.mountsW[1].volume.BlockWrite(context.Background(), barHash, []byte("bar")), IsNil)
399
400         // If the requested block is available on the second volume,
401         // it doesn't matter that the first volume failed.
402         resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
403         c.Check(resp.Code, Equals, http.StatusOK)
404         c.Check(resp.Body.String(), Equals, "bar")
405 }
406
407 func (s *routerSuite) TestIndex(c *C) {
408         router, cancel := testRouter(c, s.cluster, nil)
409         defer cancel()
410
411         resp := call(router, "GET", "http://example/index", s.cluster.SystemRootToken, nil, nil)
412         c.Check(resp.Code, Equals, http.StatusOK)
413         c.Check(resp.Body.String(), Equals, "\n")
414
415         resp = call(router, "GET", "http://example/index?prefix=fff", s.cluster.SystemRootToken, nil, nil)
416         c.Check(resp.Code, Equals, http.StatusOK)
417         c.Check(resp.Body.String(), Equals, "\n")
418
419         t0 := time.Now().Add(-time.Hour)
420         vol0 := router.keepstore.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume)
421         err := vol0.BlockWrite(context.Background(), fooHash, []byte("foo"))
422         c.Assert(err, IsNil)
423         err = vol0.blockTouchWithTime(fooHash, t0)
424         c.Assert(err, IsNil)
425         err = vol0.BlockWrite(context.Background(), barHash, []byte("bar"))
426         c.Assert(err, IsNil)
427         err = vol0.blockTouchWithTime(barHash, t0)
428         c.Assert(err, IsNil)
429         t1 := time.Now().Add(-time.Minute)
430         vol1 := router.keepstore.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume)
431         err = vol1.BlockWrite(context.Background(), barHash, []byte("bar"))
432         c.Assert(err, IsNil)
433         err = vol1.blockTouchWithTime(barHash, t1)
434         c.Assert(err, IsNil)
435
436         for _, path := range []string{
437                 "/index?prefix=acb",
438                 "/index/acb",
439                 "/index/?prefix=acb",
440                 "/mounts/zzzzz-nyw5e-000000000000000/blocks?prefix=acb",
441                 "/mounts/zzzzz-nyw5e-000000000000000/blocks/?prefix=acb",
442                 "/mounts/zzzzz-nyw5e-000000000000000/blocks/acb",
443         } {
444                 c.Logf("=== %s", path)
445                 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
446                 c.Check(resp.Code, Equals, http.StatusOK)
447                 c.Check(resp.Body.String(), Equals, fooHash+"+3 "+fmt.Sprintf("%d", t0.UnixNano())+"\n\n")
448         }
449
450         for _, path := range []string{
451                 "/index?prefix=37",
452                 "/index/37",
453                 "/index/?prefix=37",
454         } {
455                 c.Logf("=== %s", path)
456                 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
457                 c.Check(resp.Code, Equals, http.StatusOK)
458                 c.Check(resp.Body.String(), Equals, ""+
459                         barHash+"+3 "+fmt.Sprintf("%d", t0.UnixNano())+"\n"+
460                         barHash+"+3 "+fmt.Sprintf("%d", t1.UnixNano())+"\n\n")
461         }
462
463         for _, path := range []string{
464                 "/mounts/zzzzz-nyw5e-111111111111111/blocks",
465                 "/mounts/zzzzz-nyw5e-111111111111111/blocks/",
466                 "/mounts/zzzzz-nyw5e-111111111111111/blocks?prefix=37",
467                 "/mounts/zzzzz-nyw5e-111111111111111/blocks/?prefix=37",
468                 "/mounts/zzzzz-nyw5e-111111111111111/blocks/37",
469         } {
470                 c.Logf("=== %s", path)
471                 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
472                 c.Check(resp.Code, Equals, http.StatusOK)
473                 c.Check(resp.Body.String(), Equals, barHash+"+3 "+fmt.Sprintf("%d", t1.UnixNano())+"\n\n")
474         }
475
476         for _, path := range []string{
477                 "/index",
478                 "/index?prefix=",
479                 "/index/",
480                 "/index/?prefix=",
481         } {
482                 c.Logf("=== %s", path)
483                 resp = call(router, "GET", "http://example"+path, s.cluster.SystemRootToken, nil, nil)
484                 c.Check(resp.Code, Equals, http.StatusOK)
485                 c.Check(strings.Split(resp.Body.String(), "\n"), HasLen, 5)
486         }
487 }
488
489 // Check that the context passed to a volume method gets cancelled
490 // when the http client hangs up.
491 func (s *routerSuite) TestCancelOnDisconnect(c *C) {
492         router, cancel := testRouter(c, s.cluster, nil)
493         defer cancel()
494
495         unblock := make(chan struct{})
496         router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.WriterAt) error {
497                 <-unblock
498                 c.Check(ctx.Err(), NotNil)
499                 return ctx.Err()
500         }
501         go func() {
502                 time.Sleep(time.Second / 10)
503                 cancel()
504                 close(unblock)
505         }()
506         locSigned := router.keepstore.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3")
507         ctx, cancel := context.WithCancel(context.Background())
508         defer cancel()
509         req, err := http.NewRequestWithContext(ctx, "GET", "http://example/"+locSigned, nil)
510         c.Assert(err, IsNil)
511         req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2)
512         resp := httptest.NewRecorder()
513         router.ServeHTTP(resp, req)
514         c.Check(resp.Code, Equals, 499)
515 }
516
517 func (s *routerSuite) TestCORSPreflight(c *C) {
518         router, cancel := testRouter(c, s.cluster, nil)
519         defer cancel()
520
521         for _, path := range []string{"/", "/whatever", "/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+123"} {
522                 c.Logf("=== %s", path)
523                 resp := call(router, http.MethodOptions, "http://example"+path, arvadostest.ActiveTokenV2, nil, nil)
524                 c.Check(resp.Code, Equals, http.StatusOK)
525                 c.Check(resp.Body.String(), Equals, "")
526                 checkCORSHeaders(c, resp.Header())
527         }
528 }
529
530 func call(handler http.Handler, method, path, tok string, body []byte, hdr http.Header) *httptest.ResponseRecorder {
531         resp := httptest.NewRecorder()
532         req, err := http.NewRequest(method, path, bytes.NewReader(body))
533         if err != nil {
534                 panic(err)
535         }
536         for k := range hdr {
537                 req.Header.Set(k, hdr.Get(k))
538         }
539         if tok != "" {
540                 req.Header.Set("Authorization", "Bearer "+tok)
541         }
542         handler.ServeHTTP(resp, req)
543         return resp
544 }
545
546 func checkCORSHeaders(c *C, h http.Header) {
547         c.Check(h.Get("Access-Control-Allow-Methods"), Equals, "GET, HEAD, PUT, OPTIONS")
548         c.Check(h.Get("Access-Control-Allow-Origin"), Equals, "*")
549         c.Check(h.Get("Access-Control-Allow-Headers"), Equals, "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas, X-Keep-Signature, X-Keep-Storage-Classes")
550         c.Check(h.Get("Access-Control-Expose-Headers"), Equals, "X-Keep-Locator, X-Keep-Replicas-Stored, X-Keep-Storage-Classes-Confirmed")
551 }