Merge branch 'github-pr-223'
[arvados.git] / services / keepstore / handler_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Tests for Keep HTTP handlers:
6 //
7 // - GetBlockHandler
8 // - PutBlockHandler
9 // - IndexHandler
10 //
11 // The HTTP handlers are responsible for enforcing permission policy,
12 // so these tests must exercise all possible permission permutations.
13
14 package keepstore
15
16 import (
17         "bytes"
18         "context"
19         "encoding/json"
20         "fmt"
21         "net/http"
22         "net/http/httptest"
23         "os"
24         "sort"
25         "strings"
26         "sync/atomic"
27         "time"
28
29         "git.arvados.org/arvados.git/lib/config"
30         "git.arvados.org/arvados.git/sdk/go/arvados"
31         "git.arvados.org/arvados.git/sdk/go/arvadostest"
32         "git.arvados.org/arvados.git/sdk/go/ctxlog"
33         "github.com/prometheus/client_golang/prometheus"
34         check "gopkg.in/check.v1"
35 )
36
37 var testServiceURL = func() arvados.URL {
38         return arvados.URL{Host: "localhost:12345", Scheme: "http"}
39 }()
40
41 func testCluster(t TB) *arvados.Cluster {
42         cfg, err := config.NewLoader(bytes.NewBufferString("Clusters: {zzzzz: {}}"), ctxlog.TestLogger(t)).Load()
43         if err != nil {
44                 t.Fatal(err)
45         }
46         cluster, err := cfg.GetCluster("")
47         if err != nil {
48                 t.Fatal(err)
49         }
50         cluster.SystemRootToken = arvadostest.SystemRootToken
51         cluster.ManagementToken = arvadostest.ManagementToken
52         cluster.Collections.BlobSigning = false
53         return cluster
54 }
55
56 var _ = check.Suite(&HandlerSuite{})
57
58 type HandlerSuite struct {
59         cluster *arvados.Cluster
60         handler *handler
61 }
62
63 func (s *HandlerSuite) SetUpTest(c *check.C) {
64         s.cluster = testCluster(c)
65         s.cluster.Volumes = map[string]arvados.Volume{
66                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "mock"},
67                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "mock"},
68         }
69         s.handler = &handler{}
70 }
71
72 // A RequestTester represents the parameters for an HTTP request to
73 // be issued on behalf of a unit test.
74 type RequestTester struct {
75         uri            string
76         apiToken       string
77         method         string
78         requestBody    []byte
79         storageClasses string
80 }
81
82 // Test GetBlockHandler on the following situations:
83 //   - permissions off, unauthenticated request, unsigned locator
84 //   - permissions on, authenticated request, signed locator
85 //   - permissions on, authenticated request, unsigned locator
86 //   - permissions on, unauthenticated request, signed locator
87 //   - permissions on, authenticated request, expired locator
88 //   - permissions on, authenticated request, signed locator, transient error from backend
89 func (s *HandlerSuite) TestGetHandler(c *check.C) {
90         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
91
92         vols := s.handler.volmgr.AllWritable()
93         err := vols[0].Put(context.Background(), TestHash, TestBlock)
94         c.Check(err, check.IsNil)
95
96         // Create locators for testing.
97         // Turn on permission settings so we can generate signed locators.
98         s.cluster.Collections.BlobSigning = true
99         s.cluster.Collections.BlobSigningKey = knownKey
100         s.cluster.Collections.BlobSigningTTL.Set("5m")
101
102         var (
103                 unsignedLocator  = "/" + TestHash
104                 validTimestamp   = time.Now().Add(s.cluster.Collections.BlobSigningTTL.Duration())
105                 expiredTimestamp = time.Now().Add(-time.Hour)
106                 signedLocator    = "/" + SignLocator(s.cluster, TestHash, knownToken, validTimestamp)
107                 expiredLocator   = "/" + SignLocator(s.cluster, TestHash, knownToken, expiredTimestamp)
108         )
109
110         // -----------------
111         // Test unauthenticated request with permissions off.
112         s.cluster.Collections.BlobSigning = false
113
114         // Unauthenticated request, unsigned locator
115         // => OK
116         response := IssueRequest(s.handler,
117                 &RequestTester{
118                         method: "GET",
119                         uri:    unsignedLocator,
120                 })
121         ExpectStatusCode(c,
122                 "Unauthenticated request, unsigned locator", http.StatusOK, response)
123         ExpectBody(c,
124                 "Unauthenticated request, unsigned locator",
125                 string(TestBlock),
126                 response)
127
128         receivedLen := response.Header().Get("Content-Length")
129         expectedLen := fmt.Sprintf("%d", len(TestBlock))
130         if receivedLen != expectedLen {
131                 c.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
132         }
133
134         // ----------------
135         // Permissions: on.
136         s.cluster.Collections.BlobSigning = true
137
138         // Authenticated request, signed locator
139         // => OK
140         response = IssueRequest(s.handler, &RequestTester{
141                 method:   "GET",
142                 uri:      signedLocator,
143                 apiToken: knownToken,
144         })
145         ExpectStatusCode(c,
146                 "Authenticated request, signed locator", http.StatusOK, response)
147         ExpectBody(c,
148                 "Authenticated request, signed locator", string(TestBlock), response)
149
150         receivedLen = response.Header().Get("Content-Length")
151         expectedLen = fmt.Sprintf("%d", len(TestBlock))
152         if receivedLen != expectedLen {
153                 c.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
154         }
155
156         // Authenticated request, unsigned locator
157         // => PermissionError
158         response = IssueRequest(s.handler, &RequestTester{
159                 method:   "GET",
160                 uri:      unsignedLocator,
161                 apiToken: knownToken,
162         })
163         ExpectStatusCode(c, "unsigned locator", PermissionError.HTTPCode, response)
164
165         // Unauthenticated request, signed locator
166         // => PermissionError
167         response = IssueRequest(s.handler, &RequestTester{
168                 method: "GET",
169                 uri:    signedLocator,
170         })
171         ExpectStatusCode(c,
172                 "Unauthenticated request, signed locator",
173                 PermissionError.HTTPCode, response)
174
175         // Authenticated request, expired locator
176         // => ExpiredError
177         response = IssueRequest(s.handler, &RequestTester{
178                 method:   "GET",
179                 uri:      expiredLocator,
180                 apiToken: knownToken,
181         })
182         ExpectStatusCode(c,
183                 "Authenticated request, expired locator",
184                 ExpiredError.HTTPCode, response)
185
186         // Authenticated request, signed locator
187         // => 503 Server busy (transient error)
188
189         // Set up the block owning volume to respond with errors
190         vols[0].Volume.(*MockVolume).Bad = true
191         vols[0].Volume.(*MockVolume).BadVolumeError = VolumeBusyError
192         response = IssueRequest(s.handler, &RequestTester{
193                 method:   "GET",
194                 uri:      signedLocator,
195                 apiToken: knownToken,
196         })
197         // A transient error from one volume while the other doesn't find the block
198         // should make the service return a 503 so that clients can retry.
199         ExpectStatusCode(c,
200                 "Volume backend busy",
201                 503, response)
202 }
203
204 // Test PutBlockHandler on the following situations:
205 //   - no server key
206 //   - with server key, authenticated request, unsigned locator
207 //   - with server key, unauthenticated request, unsigned locator
208 func (s *HandlerSuite) TestPutHandler(c *check.C) {
209         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
210
211         // --------------
212         // No server key.
213
214         s.cluster.Collections.BlobSigningKey = ""
215
216         // Unauthenticated request, no server key
217         // => OK (unsigned response)
218         unsignedLocator := "/" + TestHash
219         response := IssueRequest(s.handler,
220                 &RequestTester{
221                         method:      "PUT",
222                         uri:         unsignedLocator,
223                         requestBody: TestBlock,
224                 })
225
226         ExpectStatusCode(c,
227                 "Unauthenticated request, no server key", http.StatusOK, response)
228         ExpectBody(c,
229                 "Unauthenticated request, no server key",
230                 TestHashPutResp, response)
231
232         // ------------------
233         // With a server key.
234
235         s.cluster.Collections.BlobSigningKey = knownKey
236         s.cluster.Collections.BlobSigningTTL.Set("5m")
237
238         // When a permission key is available, the locator returned
239         // from an authenticated PUT request will be signed.
240
241         // Authenticated PUT, signed locator
242         // => OK (signed response)
243         response = IssueRequest(s.handler,
244                 &RequestTester{
245                         method:      "PUT",
246                         uri:         unsignedLocator,
247                         requestBody: TestBlock,
248                         apiToken:    knownToken,
249                 })
250
251         ExpectStatusCode(c,
252                 "Authenticated PUT, signed locator, with server key",
253                 http.StatusOK, response)
254         responseLocator := strings.TrimSpace(response.Body.String())
255         if VerifySignature(s.cluster, responseLocator, knownToken) != nil {
256                 c.Errorf("Authenticated PUT, signed locator, with server key:\n"+
257                         "response '%s' does not contain a valid signature",
258                         responseLocator)
259         }
260
261         // Unauthenticated PUT, unsigned locator
262         // => OK
263         response = IssueRequest(s.handler,
264                 &RequestTester{
265                         method:      "PUT",
266                         uri:         unsignedLocator,
267                         requestBody: TestBlock,
268                 })
269
270         ExpectStatusCode(c,
271                 "Unauthenticated PUT, unsigned locator, with server key",
272                 http.StatusOK, response)
273         ExpectBody(c,
274                 "Unauthenticated PUT, unsigned locator, with server key",
275                 TestHashPutResp, response)
276 }
277
278 func (s *HandlerSuite) TestPutAndDeleteSkipReadonlyVolumes(c *check.C) {
279         s.cluster.Volumes["zzzzz-nyw5e-000000000000000"] = arvados.Volume{Driver: "mock", ReadOnly: true}
280         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
281
282         s.cluster.SystemRootToken = "fake-data-manager-token"
283         IssueRequest(s.handler,
284                 &RequestTester{
285                         method:      "PUT",
286                         uri:         "/" + TestHash,
287                         requestBody: TestBlock,
288                 })
289
290         s.cluster.Collections.BlobTrash = true
291         IssueRequest(s.handler,
292                 &RequestTester{
293                         method:      "DELETE",
294                         uri:         "/" + TestHash,
295                         requestBody: TestBlock,
296                         apiToken:    s.cluster.SystemRootToken,
297                 })
298         type expect struct {
299                 volid     string
300                 method    string
301                 callcount int
302         }
303         for _, e := range []expect{
304                 {"zzzzz-nyw5e-000000000000000", "Get", 0},
305                 {"zzzzz-nyw5e-000000000000000", "Compare", 0},
306                 {"zzzzz-nyw5e-000000000000000", "Touch", 0},
307                 {"zzzzz-nyw5e-000000000000000", "Put", 0},
308                 {"zzzzz-nyw5e-000000000000000", "Delete", 0},
309                 {"zzzzz-nyw5e-111111111111111", "Get", 0},
310                 {"zzzzz-nyw5e-111111111111111", "Compare", 1},
311                 {"zzzzz-nyw5e-111111111111111", "Touch", 1},
312                 {"zzzzz-nyw5e-111111111111111", "Put", 1},
313                 {"zzzzz-nyw5e-111111111111111", "Delete", 1},
314         } {
315                 if calls := s.handler.volmgr.mountMap[e.volid].Volume.(*MockVolume).CallCount(e.method); calls != e.callcount {
316                         c.Errorf("Got %d %s() on vol %s, expect %d", calls, e.method, e.volid, e.callcount)
317                 }
318         }
319 }
320
321 func (s *HandlerSuite) TestReadsOrderedByStorageClassPriority(c *check.C) {
322         s.cluster.Volumes = map[string]arvados.Volume{
323                 "zzzzz-nyw5e-111111111111111": {
324                         Driver:         "mock",
325                         Replication:    1,
326                         StorageClasses: map[string]bool{"class1": true}},
327                 "zzzzz-nyw5e-222222222222222": {
328                         Driver:         "mock",
329                         Replication:    1,
330                         StorageClasses: map[string]bool{"class2": true, "class3": true}},
331         }
332
333         for _, trial := range []struct {
334                 priority1 int // priority of class1, thus vol1
335                 priority2 int // priority of class2
336                 priority3 int // priority of class3 (vol2 priority will be max(priority2, priority3))
337                 get1      int // expected number of "get" ops on vol1
338                 get2      int // expected number of "get" ops on vol2
339         }{
340                 {100, 50, 50, 1, 0},   // class1 has higher priority => try vol1 first, no need to try vol2
341                 {100, 100, 100, 1, 0}, // same priority, vol1 is first lexicographically => try vol1 first and succeed
342                 {66, 99, 33, 1, 1},    // class2 has higher priority => try vol2 first, then try vol1
343                 {66, 33, 99, 1, 1},    // class3 has highest priority => vol2 has highest => try vol2 first, then try vol1
344         } {
345                 c.Logf("%+v", trial)
346                 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
347                         "class1": {Priority: trial.priority1},
348                         "class2": {Priority: trial.priority2},
349                         "class3": {Priority: trial.priority3},
350                 }
351                 c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
352                 IssueRequest(s.handler,
353                         &RequestTester{
354                                 method:         "PUT",
355                                 uri:            "/" + TestHash,
356                                 requestBody:    TestBlock,
357                                 storageClasses: "class1",
358                         })
359                 IssueRequest(s.handler,
360                         &RequestTester{
361                                 method: "GET",
362                                 uri:    "/" + TestHash,
363                         })
364                 c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-111111111111111"].Volume.(*MockVolume).CallCount("Get"), check.Equals, trial.get1)
365                 c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-222222222222222"].Volume.(*MockVolume).CallCount("Get"), check.Equals, trial.get2)
366         }
367 }
368
369 func (s *HandlerSuite) TestPutWithNoWritableVolumes(c *check.C) {
370         s.cluster.Volumes = map[string]arvados.Volume{
371                 "zzzzz-nyw5e-111111111111111": {
372                         Driver:         "mock",
373                         Replication:    1,
374                         ReadOnly:       true,
375                         StorageClasses: map[string]bool{"class1": true}},
376         }
377         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
378         resp := IssueRequest(s.handler,
379                 &RequestTester{
380                         method:         "PUT",
381                         uri:            "/" + TestHash,
382                         requestBody:    TestBlock,
383                         storageClasses: "class1",
384                 })
385         c.Check(resp.Code, check.Equals, FullError.HTTPCode)
386         c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-111111111111111"].Volume.(*MockVolume).CallCount("Put"), check.Equals, 0)
387 }
388
389 func (s *HandlerSuite) TestConcurrentWritesToMultipleStorageClasses(c *check.C) {
390         s.cluster.Volumes = map[string]arvados.Volume{
391                 "zzzzz-nyw5e-111111111111111": {
392                         Driver:         "mock",
393                         Replication:    1,
394                         StorageClasses: map[string]bool{"class1": true}},
395                 "zzzzz-nyw5e-121212121212121": {
396                         Driver:         "mock",
397                         Replication:    1,
398                         StorageClasses: map[string]bool{"class1": true, "class2": true}},
399                 "zzzzz-nyw5e-222222222222222": {
400                         Driver:         "mock",
401                         Replication:    1,
402                         StorageClasses: map[string]bool{"class2": true}},
403         }
404
405         for _, trial := range []struct {
406                 setCounter uint32 // value to stuff vm.counter, to control offset
407                 classes    string // desired classes
408                 put111     int    // expected number of "put" ops on 11111... after 2x put reqs
409                 put121     int    // expected number of "put" ops on 12121...
410                 put222     int    // expected number of "put" ops on 22222...
411                 cmp111     int    // expected number of "compare" ops on 11111... after 2x put reqs
412                 cmp121     int    // expected number of "compare" ops on 12121...
413                 cmp222     int    // expected number of "compare" ops on 22222...
414         }{
415                 {0, "class1",
416                         1, 0, 0,
417                         2, 1, 0}, // first put compares on all vols with class2; second put succeeds after checking 121
418                 {0, "class2",
419                         0, 1, 0,
420                         0, 2, 1}, // first put compares on all vols with class2; second put succeeds after checking 121
421                 {0, "class1,class2",
422                         1, 1, 0,
423                         2, 2, 1}, // first put compares on all vols; second put succeeds after checking 111 and 121
424                 {1, "class1,class2",
425                         0, 1, 0, // vm.counter offset is 1 so the first volume attempted is 121
426                         2, 2, 1}, // first put compares on all vols; second put succeeds after checking 111 and 121
427                 {0, "class1,class2,class404",
428                         1, 1, 0,
429                         2, 2, 1}, // first put compares on all vols; second put doesn't compare on 222 because it already satisfied class2 on 121
430         } {
431                 c.Logf("%+v", trial)
432                 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
433                         "class1": {},
434                         "class2": {},
435                         "class3": {},
436                 }
437                 c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
438                 atomic.StoreUint32(&s.handler.volmgr.counter, trial.setCounter)
439                 for i := 0; i < 2; i++ {
440                         IssueRequest(s.handler,
441                                 &RequestTester{
442                                         method:         "PUT",
443                                         uri:            "/" + TestHash,
444                                         requestBody:    TestBlock,
445                                         storageClasses: trial.classes,
446                                 })
447                 }
448                 c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-111111111111111"].Volume.(*MockVolume).CallCount("Put"), check.Equals, trial.put111)
449                 c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-121212121212121"].Volume.(*MockVolume).CallCount("Put"), check.Equals, trial.put121)
450                 c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-222222222222222"].Volume.(*MockVolume).CallCount("Put"), check.Equals, trial.put222)
451                 c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-111111111111111"].Volume.(*MockVolume).CallCount("Compare"), check.Equals, trial.cmp111)
452                 c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-121212121212121"].Volume.(*MockVolume).CallCount("Compare"), check.Equals, trial.cmp121)
453                 c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-222222222222222"].Volume.(*MockVolume).CallCount("Compare"), check.Equals, trial.cmp222)
454         }
455 }
456
457 // Test TOUCH requests.
458 func (s *HandlerSuite) TestTouchHandler(c *check.C) {
459         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
460         vols := s.handler.volmgr.AllWritable()
461         vols[0].Put(context.Background(), TestHash, TestBlock)
462         vols[0].Volume.(*MockVolume).TouchWithDate(TestHash, time.Now().Add(-time.Hour))
463         afterPut := time.Now()
464         t, err := vols[0].Mtime(TestHash)
465         c.Assert(err, check.IsNil)
466         c.Assert(t.Before(afterPut), check.Equals, true)
467
468         ExpectStatusCode(c,
469                 "touch with no credentials",
470                 http.StatusUnauthorized,
471                 IssueRequest(s.handler, &RequestTester{
472                         method: "TOUCH",
473                         uri:    "/" + TestHash,
474                 }))
475
476         ExpectStatusCode(c,
477                 "touch with non-root credentials",
478                 http.StatusUnauthorized,
479                 IssueRequest(s.handler, &RequestTester{
480                         method:   "TOUCH",
481                         uri:      "/" + TestHash,
482                         apiToken: arvadostest.ActiveTokenV2,
483                 }))
484
485         ExpectStatusCode(c,
486                 "touch non-existent block",
487                 http.StatusNotFound,
488                 IssueRequest(s.handler, &RequestTester{
489                         method:   "TOUCH",
490                         uri:      "/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
491                         apiToken: s.cluster.SystemRootToken,
492                 }))
493
494         beforeTouch := time.Now()
495         ExpectStatusCode(c,
496                 "touch block",
497                 http.StatusOK,
498                 IssueRequest(s.handler, &RequestTester{
499                         method:   "TOUCH",
500                         uri:      "/" + TestHash,
501                         apiToken: s.cluster.SystemRootToken,
502                 }))
503         t, err = vols[0].Mtime(TestHash)
504         c.Assert(err, check.IsNil)
505         c.Assert(t.After(beforeTouch), check.Equals, true)
506 }
507
508 // Test /index requests:
509 //   - unauthenticated /index request
510 //   - unauthenticated /index/prefix request
511 //   - authenticated   /index request        | non-superuser
512 //   - authenticated   /index/prefix request | non-superuser
513 //   - authenticated   /index request        | superuser
514 //   - authenticated   /index/prefix request | superuser
515 //
516 // The only /index requests that should succeed are those issued by the
517 // superuser. They should pass regardless of the value of BlobSigning.
518 func (s *HandlerSuite) TestIndexHandler(c *check.C) {
519         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
520
521         // Include multiple blocks on different volumes, and
522         // some metadata files (which should be omitted from index listings)
523         vols := s.handler.volmgr.AllWritable()
524         vols[0].Put(context.Background(), TestHash, TestBlock)
525         vols[1].Put(context.Background(), TestHash2, TestBlock2)
526         vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
527         vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
528
529         s.cluster.SystemRootToken = "DATA MANAGER TOKEN"
530
531         unauthenticatedReq := &RequestTester{
532                 method: "GET",
533                 uri:    "/index",
534         }
535         authenticatedReq := &RequestTester{
536                 method:   "GET",
537                 uri:      "/index",
538                 apiToken: knownToken,
539         }
540         superuserReq := &RequestTester{
541                 method:   "GET",
542                 uri:      "/index",
543                 apiToken: s.cluster.SystemRootToken,
544         }
545         unauthPrefixReq := &RequestTester{
546                 method: "GET",
547                 uri:    "/index/" + TestHash[0:3],
548         }
549         authPrefixReq := &RequestTester{
550                 method:   "GET",
551                 uri:      "/index/" + TestHash[0:3],
552                 apiToken: knownToken,
553         }
554         superuserPrefixReq := &RequestTester{
555                 method:   "GET",
556                 uri:      "/index/" + TestHash[0:3],
557                 apiToken: s.cluster.SystemRootToken,
558         }
559         superuserNoSuchPrefixReq := &RequestTester{
560                 method:   "GET",
561                 uri:      "/index/abcd",
562                 apiToken: s.cluster.SystemRootToken,
563         }
564         superuserInvalidPrefixReq := &RequestTester{
565                 method:   "GET",
566                 uri:      "/index/xyz",
567                 apiToken: s.cluster.SystemRootToken,
568         }
569
570         // -------------------------------------------------------------
571         // Only the superuser should be allowed to issue /index requests.
572
573         // ---------------------------
574         // BlobSigning enabled
575         // This setting should not affect tests passing.
576         s.cluster.Collections.BlobSigning = true
577
578         // unauthenticated /index request
579         // => UnauthorizedError
580         response := IssueRequest(s.handler, unauthenticatedReq)
581         ExpectStatusCode(c,
582                 "permissions on, unauthenticated request",
583                 UnauthorizedError.HTTPCode,
584                 response)
585
586         // unauthenticated /index/prefix request
587         // => UnauthorizedError
588         response = IssueRequest(s.handler, unauthPrefixReq)
589         ExpectStatusCode(c,
590                 "permissions on, unauthenticated /index/prefix request",
591                 UnauthorizedError.HTTPCode,
592                 response)
593
594         // authenticated /index request, non-superuser
595         // => UnauthorizedError
596         response = IssueRequest(s.handler, authenticatedReq)
597         ExpectStatusCode(c,
598                 "permissions on, authenticated request, non-superuser",
599                 UnauthorizedError.HTTPCode,
600                 response)
601
602         // authenticated /index/prefix request, non-superuser
603         // => UnauthorizedError
604         response = IssueRequest(s.handler, authPrefixReq)
605         ExpectStatusCode(c,
606                 "permissions on, authenticated /index/prefix request, non-superuser",
607                 UnauthorizedError.HTTPCode,
608                 response)
609
610         // superuser /index request
611         // => OK
612         response = IssueRequest(s.handler, superuserReq)
613         ExpectStatusCode(c,
614                 "permissions on, superuser request",
615                 http.StatusOK,
616                 response)
617
618         // ----------------------------
619         // BlobSigning disabled
620         // Valid Request should still pass.
621         s.cluster.Collections.BlobSigning = false
622
623         // superuser /index request
624         // => OK
625         response = IssueRequest(s.handler, superuserReq)
626         ExpectStatusCode(c,
627                 "permissions on, superuser request",
628                 http.StatusOK,
629                 response)
630
631         expected := `^` + TestHash + `\+\d+ \d+\n` +
632                 TestHash2 + `\+\d+ \d+\n\n$`
633         c.Check(response.Body.String(), check.Matches, expected, check.Commentf(
634                 "permissions on, superuser request"))
635
636         // superuser /index/prefix request
637         // => OK
638         response = IssueRequest(s.handler, superuserPrefixReq)
639         ExpectStatusCode(c,
640                 "permissions on, superuser request",
641                 http.StatusOK,
642                 response)
643
644         expected = `^` + TestHash + `\+\d+ \d+\n\n$`
645         c.Check(response.Body.String(), check.Matches, expected, check.Commentf(
646                 "permissions on, superuser /index/prefix request"))
647
648         // superuser /index/{no-such-prefix} request
649         // => OK
650         response = IssueRequest(s.handler, superuserNoSuchPrefixReq)
651         ExpectStatusCode(c,
652                 "permissions on, superuser request",
653                 http.StatusOK,
654                 response)
655
656         if "\n" != response.Body.String() {
657                 c.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String())
658         }
659
660         // superuser /index/{invalid-prefix} request
661         // => StatusBadRequest
662         response = IssueRequest(s.handler, superuserInvalidPrefixReq)
663         ExpectStatusCode(c,
664                 "permissions on, superuser request",
665                 http.StatusBadRequest,
666                 response)
667 }
668
669 // TestDeleteHandler
670 //
671 // Cases tested:
672 //
673 //      With no token and with a non-data-manager token:
674 //      * Delete existing block
675 //        (test for 403 Forbidden, confirm block not deleted)
676 //
677 //      With data manager token:
678 //
679 //      * Delete existing block
680 //        (test for 200 OK, response counts, confirm block deleted)
681 //
682 //      * Delete nonexistent block
683 //        (test for 200 OK, response counts)
684 //
685 //      TODO(twp):
686 //
687 //      * Delete block on read-only and read-write volume
688 //        (test for 200 OK, response with copies_deleted=1,
689 //        copies_failed=1, confirm block deleted only on r/w volume)
690 //
691 //      * Delete block on read-only volume only
692 //        (test for 200 OK, response with copies_deleted=0, copies_failed=1,
693 //        confirm block not deleted)
694 func (s *HandlerSuite) TestDeleteHandler(c *check.C) {
695         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
696
697         vols := s.handler.volmgr.AllWritable()
698         vols[0].Put(context.Background(), TestHash, TestBlock)
699
700         // Explicitly set the BlobSigningTTL to 0 for these
701         // tests, to ensure the MockVolume deletes the blocks
702         // even though they have just been created.
703         s.cluster.Collections.BlobSigningTTL = arvados.Duration(0)
704
705         var userToken = "NOT DATA MANAGER TOKEN"
706         s.cluster.SystemRootToken = "DATA MANAGER TOKEN"
707
708         s.cluster.Collections.BlobTrash = true
709
710         unauthReq := &RequestTester{
711                 method: "DELETE",
712                 uri:    "/" + TestHash,
713         }
714
715         userReq := &RequestTester{
716                 method:   "DELETE",
717                 uri:      "/" + TestHash,
718                 apiToken: userToken,
719         }
720
721         superuserExistingBlockReq := &RequestTester{
722                 method:   "DELETE",
723                 uri:      "/" + TestHash,
724                 apiToken: s.cluster.SystemRootToken,
725         }
726
727         superuserNonexistentBlockReq := &RequestTester{
728                 method:   "DELETE",
729                 uri:      "/" + TestHash2,
730                 apiToken: s.cluster.SystemRootToken,
731         }
732
733         // Unauthenticated request returns PermissionError.
734         var response *httptest.ResponseRecorder
735         response = IssueRequest(s.handler, unauthReq)
736         ExpectStatusCode(c,
737                 "unauthenticated request",
738                 PermissionError.HTTPCode,
739                 response)
740
741         // Authenticated non-admin request returns PermissionError.
742         response = IssueRequest(s.handler, userReq)
743         ExpectStatusCode(c,
744                 "authenticated non-admin request",
745                 PermissionError.HTTPCode,
746                 response)
747
748         // Authenticated admin request for nonexistent block.
749         type deletecounter struct {
750                 Deleted int `json:"copies_deleted"`
751                 Failed  int `json:"copies_failed"`
752         }
753         var responseDc, expectedDc deletecounter
754
755         response = IssueRequest(s.handler, superuserNonexistentBlockReq)
756         ExpectStatusCode(c,
757                 "data manager request, nonexistent block",
758                 http.StatusNotFound,
759                 response)
760
761         // Authenticated admin request for existing block while BlobTrash is false.
762         s.cluster.Collections.BlobTrash = false
763         response = IssueRequest(s.handler, superuserExistingBlockReq)
764         ExpectStatusCode(c,
765                 "authenticated request, existing block, method disabled",
766                 MethodDisabledError.HTTPCode,
767                 response)
768         s.cluster.Collections.BlobTrash = true
769
770         // Authenticated admin request for existing block.
771         response = IssueRequest(s.handler, superuserExistingBlockReq)
772         ExpectStatusCode(c,
773                 "data manager request, existing block",
774                 http.StatusOK,
775                 response)
776         // Expect response {"copies_deleted":1,"copies_failed":0}
777         expectedDc = deletecounter{1, 0}
778         json.NewDecoder(response.Body).Decode(&responseDc)
779         if responseDc != expectedDc {
780                 c.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
781                         expectedDc, responseDc)
782         }
783         // Confirm the block has been deleted
784         buf := make([]byte, BlockSize)
785         _, err := vols[0].Get(context.Background(), TestHash, buf)
786         var blockDeleted = os.IsNotExist(err)
787         if !blockDeleted {
788                 c.Error("superuserExistingBlockReq: block not deleted")
789         }
790
791         // A DELETE request on a block newer than BlobSigningTTL
792         // should return success but leave the block on the volume.
793         vols[0].Put(context.Background(), TestHash, TestBlock)
794         s.cluster.Collections.BlobSigningTTL = arvados.Duration(time.Hour)
795
796         response = IssueRequest(s.handler, superuserExistingBlockReq)
797         ExpectStatusCode(c,
798                 "data manager request, existing block",
799                 http.StatusOK,
800                 response)
801         // Expect response {"copies_deleted":1,"copies_failed":0}
802         expectedDc = deletecounter{1, 0}
803         json.NewDecoder(response.Body).Decode(&responseDc)
804         if responseDc != expectedDc {
805                 c.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
806                         expectedDc, responseDc)
807         }
808         // Confirm the block has NOT been deleted.
809         _, err = vols[0].Get(context.Background(), TestHash, buf)
810         if err != nil {
811                 c.Errorf("testing delete on new block: %s\n", err)
812         }
813 }
814
815 // TestPullHandler
816 //
817 // Test handling of the PUT /pull statement.
818 //
819 // Cases tested: syntactically valid and invalid pull lists, from the
820 // data manager and from unprivileged users:
821 //
822 //  1. Valid pull list from an ordinary user
823 //     (expected result: 401 Unauthorized)
824 //
825 //  2. Invalid pull request from an ordinary user
826 //     (expected result: 401 Unauthorized)
827 //
828 //  3. Valid pull request from the data manager
829 //     (expected result: 200 OK with request body "Received 3 pull
830 //     requests"
831 //
832 //  4. Invalid pull request from the data manager
833 //     (expected result: 400 Bad Request)
834 //
835 // Test that in the end, the pull manager received a good pull list with
836 // the expected number of requests.
837 //
838 // TODO(twp): test concurrency: launch 100 goroutines to update the
839 // pull list simultaneously.  Make sure that none of them return 400
840 // Bad Request and that pullq.GetList() returns a valid list.
841 func (s *HandlerSuite) TestPullHandler(c *check.C) {
842         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
843
844         // Replace the router's pullq -- which the worker goroutines
845         // started by setup() are now receiving from -- with a new
846         // one, so we can see what the handler sends to it.
847         pullq := NewWorkQueue()
848         s.handler.Handler.(*router).pullq = pullq
849
850         var userToken = "USER TOKEN"
851         s.cluster.SystemRootToken = "DATA MANAGER TOKEN"
852
853         goodJSON := []byte(`[
854                 {
855                         "locator":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+12345",
856                         "servers":[
857                                 "http://server1",
858                                 "http://server2"
859                         ]
860                 },
861                 {
862                         "locator":"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+12345",
863                         "servers":[]
864                 },
865                 {
866                         "locator":"cccccccccccccccccccccccccccccccc+12345",
867                         "servers":["http://server1"]
868                 }
869         ]`)
870
871         badJSON := []byte(`{ "key":"I'm a little teapot" }`)
872
873         type pullTest struct {
874                 name         string
875                 req          RequestTester
876                 responseCode int
877                 responseBody string
878         }
879         var testcases = []pullTest{
880                 {
881                         "Valid pull list from an ordinary user",
882                         RequestTester{"/pull", userToken, "PUT", goodJSON, ""},
883                         http.StatusUnauthorized,
884                         "Unauthorized\n",
885                 },
886                 {
887                         "Invalid pull request from an ordinary user",
888                         RequestTester{"/pull", userToken, "PUT", badJSON, ""},
889                         http.StatusUnauthorized,
890                         "Unauthorized\n",
891                 },
892                 {
893                         "Valid pull request from the data manager",
894                         RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", goodJSON, ""},
895                         http.StatusOK,
896                         "Received 3 pull requests\n",
897                 },
898                 {
899                         "Invalid pull request from the data manager",
900                         RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", badJSON, ""},
901                         http.StatusBadRequest,
902                         "",
903                 },
904         }
905
906         for _, tst := range testcases {
907                 response := IssueRequest(s.handler, &tst.req)
908                 ExpectStatusCode(c, tst.name, tst.responseCode, response)
909                 ExpectBody(c, tst.name, tst.responseBody, response)
910         }
911
912         // The Keep pull manager should have received one good list with 3
913         // requests on it.
914         for i := 0; i < 3; i++ {
915                 var item interface{}
916                 select {
917                 case item = <-pullq.NextItem:
918                 case <-time.After(time.Second):
919                         c.Error("timed out")
920                 }
921                 if _, ok := item.(PullRequest); !ok {
922                         c.Errorf("item %v could not be parsed as a PullRequest", item)
923                 }
924         }
925
926         expectChannelEmpty(c, pullq.NextItem)
927 }
928
929 // TestTrashHandler
930 //
931 // Test cases:
932 //
933 // Cases tested: syntactically valid and invalid trash lists, from the
934 // data manager and from unprivileged users:
935 //
936 //  1. Valid trash list from an ordinary user
937 //     (expected result: 401 Unauthorized)
938 //
939 //  2. Invalid trash list from an ordinary user
940 //     (expected result: 401 Unauthorized)
941 //
942 //  3. Valid trash list from the data manager
943 //     (expected result: 200 OK with request body "Received 3 trash
944 //     requests"
945 //
946 //  4. Invalid trash list from the data manager
947 //     (expected result: 400 Bad Request)
948 //
949 // Test that in the end, the trash collector received a good list
950 // trash list with the expected number of requests.
951 //
952 // TODO(twp): test concurrency: launch 100 goroutines to update the
953 // pull list simultaneously.  Make sure that none of them return 400
954 // Bad Request and that replica.Dump() returns a valid list.
955 func (s *HandlerSuite) TestTrashHandler(c *check.C) {
956         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
957         // Replace the router's trashq -- which the worker goroutines
958         // started by setup() are now receiving from -- with a new
959         // one, so we can see what the handler sends to it.
960         trashq := NewWorkQueue()
961         s.handler.Handler.(*router).trashq = trashq
962
963         var userToken = "USER TOKEN"
964         s.cluster.SystemRootToken = "DATA MANAGER TOKEN"
965
966         goodJSON := []byte(`[
967                 {
968                         "locator":"block1",
969                         "block_mtime":1409082153
970                 },
971                 {
972                         "locator":"block2",
973                         "block_mtime":1409082153
974                 },
975                 {
976                         "locator":"block3",
977                         "block_mtime":1409082153
978                 }
979         ]`)
980
981         badJSON := []byte(`I am not a valid JSON string`)
982
983         type trashTest struct {
984                 name         string
985                 req          RequestTester
986                 responseCode int
987                 responseBody string
988         }
989
990         var testcases = []trashTest{
991                 {
992                         "Valid trash list from an ordinary user",
993                         RequestTester{"/trash", userToken, "PUT", goodJSON, ""},
994                         http.StatusUnauthorized,
995                         "Unauthorized\n",
996                 },
997                 {
998                         "Invalid trash list from an ordinary user",
999                         RequestTester{"/trash", userToken, "PUT", badJSON, ""},
1000                         http.StatusUnauthorized,
1001                         "Unauthorized\n",
1002                 },
1003                 {
1004                         "Valid trash list from the data manager",
1005                         RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", goodJSON, ""},
1006                         http.StatusOK,
1007                         "Received 3 trash requests\n",
1008                 },
1009                 {
1010                         "Invalid trash list from the data manager",
1011                         RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", badJSON, ""},
1012                         http.StatusBadRequest,
1013                         "",
1014                 },
1015         }
1016
1017         for _, tst := range testcases {
1018                 response := IssueRequest(s.handler, &tst.req)
1019                 ExpectStatusCode(c, tst.name, tst.responseCode, response)
1020                 ExpectBody(c, tst.name, tst.responseBody, response)
1021         }
1022
1023         // The trash collector should have received one good list with 3
1024         // requests on it.
1025         for i := 0; i < 3; i++ {
1026                 item := <-trashq.NextItem
1027                 if _, ok := item.(TrashRequest); !ok {
1028                         c.Errorf("item %v could not be parsed as a TrashRequest", item)
1029                 }
1030         }
1031
1032         expectChannelEmpty(c, trashq.NextItem)
1033 }
1034
1035 // ====================
1036 // Helper functions
1037 // ====================
1038
1039 // IssueTestRequest executes an HTTP request described by rt, to a
1040 // REST router.  It returns the HTTP response to the request.
1041 func IssueRequest(handler http.Handler, rt *RequestTester) *httptest.ResponseRecorder {
1042         response := httptest.NewRecorder()
1043         body := bytes.NewReader(rt.requestBody)
1044         req, _ := http.NewRequest(rt.method, rt.uri, body)
1045         if rt.apiToken != "" {
1046                 req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
1047         }
1048         if rt.storageClasses != "" {
1049                 req.Header.Set("X-Keep-Storage-Classes", rt.storageClasses)
1050         }
1051         handler.ServeHTTP(response, req)
1052         return response
1053 }
1054
1055 func IssueHealthCheckRequest(handler http.Handler, rt *RequestTester) *httptest.ResponseRecorder {
1056         response := httptest.NewRecorder()
1057         body := bytes.NewReader(rt.requestBody)
1058         req, _ := http.NewRequest(rt.method, rt.uri, body)
1059         if rt.apiToken != "" {
1060                 req.Header.Set("Authorization", "Bearer "+rt.apiToken)
1061         }
1062         handler.ServeHTTP(response, req)
1063         return response
1064 }
1065
1066 // ExpectStatusCode checks whether a response has the specified status code,
1067 // and reports a test failure if not.
1068 func ExpectStatusCode(
1069         c *check.C,
1070         testname string,
1071         expectedStatus int,
1072         response *httptest.ResponseRecorder) {
1073         c.Check(response.Code, check.Equals, expectedStatus, check.Commentf("%s", testname))
1074 }
1075
1076 func ExpectBody(
1077         c *check.C,
1078         testname string,
1079         expectedBody string,
1080         response *httptest.ResponseRecorder) {
1081         if expectedBody != "" && response.Body.String() != expectedBody {
1082                 c.Errorf("%s: expected response body '%s', got %+v",
1083                         testname, expectedBody, response)
1084         }
1085 }
1086
1087 // See #7121
1088 func (s *HandlerSuite) TestPutNeedsOnlyOneBuffer(c *check.C) {
1089         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
1090
1091         defer func(orig *bufferPool) {
1092                 bufs = orig
1093         }(bufs)
1094         bufs = newBufferPool(ctxlog.TestLogger(c), 1, BlockSize)
1095
1096         ok := make(chan struct{})
1097         go func() {
1098                 for i := 0; i < 2; i++ {
1099                         response := IssueRequest(s.handler,
1100                                 &RequestTester{
1101                                         method:      "PUT",
1102                                         uri:         "/" + TestHash,
1103                                         requestBody: TestBlock,
1104                                 })
1105                         ExpectStatusCode(c,
1106                                 "TestPutNeedsOnlyOneBuffer", http.StatusOK, response)
1107                 }
1108                 ok <- struct{}{}
1109         }()
1110
1111         select {
1112         case <-ok:
1113         case <-time.After(time.Second):
1114                 c.Fatal("PUT deadlocks with MaxKeepBlobBuffers==1")
1115         }
1116 }
1117
1118 // Invoke the PutBlockHandler a bunch of times to test for bufferpool resource
1119 // leak.
1120 func (s *HandlerSuite) TestPutHandlerNoBufferleak(c *check.C) {
1121         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
1122
1123         ok := make(chan bool)
1124         go func() {
1125                 for i := 0; i < s.cluster.API.MaxKeepBlobBuffers+1; i++ {
1126                         // Unauthenticated request, no server key
1127                         // => OK (unsigned response)
1128                         unsignedLocator := "/" + TestHash
1129                         response := IssueRequest(s.handler,
1130                                 &RequestTester{
1131                                         method:      "PUT",
1132                                         uri:         unsignedLocator,
1133                                         requestBody: TestBlock,
1134                                 })
1135                         ExpectStatusCode(c,
1136                                 "TestPutHandlerBufferleak", http.StatusOK, response)
1137                         ExpectBody(c,
1138                                 "TestPutHandlerBufferleak",
1139                                 TestHashPutResp, response)
1140                 }
1141                 ok <- true
1142         }()
1143         select {
1144         case <-time.After(20 * time.Second):
1145                 // If the buffer pool leaks, the test goroutine hangs.
1146                 c.Fatal("test did not finish, assuming pool leaked")
1147         case <-ok:
1148         }
1149 }
1150
1151 func (s *HandlerSuite) TestGetHandlerClientDisconnect(c *check.C) {
1152         s.cluster.Collections.BlobSigning = false
1153         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
1154
1155         defer func(orig *bufferPool) {
1156                 bufs = orig
1157         }(bufs)
1158         bufs = newBufferPool(ctxlog.TestLogger(c), 1, BlockSize)
1159         defer bufs.Put(bufs.Get(BlockSize))
1160
1161         err := s.handler.volmgr.AllWritable()[0].Put(context.Background(), TestHash, TestBlock)
1162         c.Assert(err, check.IsNil)
1163
1164         resp := httptest.NewRecorder()
1165         ok := make(chan struct{})
1166         go func() {
1167                 ctx, cancel := context.WithCancel(context.Background())
1168                 req, _ := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
1169                 cancel()
1170                 s.handler.ServeHTTP(resp, req)
1171                 ok <- struct{}{}
1172         }()
1173
1174         select {
1175         case <-time.After(20 * time.Second):
1176                 c.Fatal("request took >20s, close notifier must be broken")
1177         case <-ok:
1178         }
1179
1180         ExpectStatusCode(c, "client disconnect", http.StatusServiceUnavailable, resp)
1181         for i, v := range s.handler.volmgr.AllWritable() {
1182                 if calls := v.Volume.(*MockVolume).called["GET"]; calls != 0 {
1183                         c.Errorf("volume %d got %d calls, expected 0", i, calls)
1184                 }
1185         }
1186 }
1187
1188 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
1189 // leak.
1190 func (s *HandlerSuite) TestGetHandlerNoBufferLeak(c *check.C) {
1191         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
1192
1193         vols := s.handler.volmgr.AllWritable()
1194         if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
1195                 c.Error(err)
1196         }
1197
1198         ok := make(chan bool)
1199         go func() {
1200                 for i := 0; i < s.cluster.API.MaxKeepBlobBuffers+1; i++ {
1201                         // Unauthenticated request, unsigned locator
1202                         // => OK
1203                         unsignedLocator := "/" + TestHash
1204                         response := IssueRequest(s.handler,
1205                                 &RequestTester{
1206                                         method: "GET",
1207                                         uri:    unsignedLocator,
1208                                 })
1209                         ExpectStatusCode(c,
1210                                 "Unauthenticated request, unsigned locator", http.StatusOK, response)
1211                         ExpectBody(c,
1212                                 "Unauthenticated request, unsigned locator",
1213                                 string(TestBlock),
1214                                 response)
1215                 }
1216                 ok <- true
1217         }()
1218         select {
1219         case <-time.After(20 * time.Second):
1220                 // If the buffer pool leaks, the test goroutine hangs.
1221                 c.Fatal("test did not finish, assuming pool leaked")
1222         case <-ok:
1223         }
1224 }
1225
1226 func (s *HandlerSuite) TestPutStorageClasses(c *check.C) {
1227         s.cluster.Volumes = map[string]arvados.Volume{
1228                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "mock"}, // "default" is implicit
1229                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "mock", StorageClasses: map[string]bool{"special": true, "extra": true}},
1230                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "mock", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
1231         }
1232         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
1233         rt := RequestTester{
1234                 method:      "PUT",
1235                 uri:         "/" + TestHash,
1236                 requestBody: TestBlock,
1237         }
1238
1239         for _, trial := range []struct {
1240                 ask    string
1241                 expect string
1242         }{
1243                 {"", ""},
1244                 {"default", "default=1"},
1245                 {" , default , default , ", "default=1"},
1246                 {"special", "extra=1, special=1"},
1247                 {"special, readonly", "extra=1, special=1"},
1248                 {"special, nonexistent", "extra=1, special=1"},
1249                 {"extra, special", "extra=1, special=1"},
1250                 {"default, special", "default=1, extra=1, special=1"},
1251         } {
1252                 c.Logf("success case %#v", trial)
1253                 rt.storageClasses = trial.ask
1254                 resp := IssueRequest(s.handler, &rt)
1255                 if trial.expect == "" {
1256                         // any non-empty value is correct
1257                         c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), check.Not(check.Equals), "")
1258                 } else {
1259                         c.Check(sortCommaSeparated(resp.Header().Get("X-Keep-Storage-Classes-Confirmed")), check.Equals, trial.expect)
1260                 }
1261         }
1262
1263         for _, trial := range []struct {
1264                 ask string
1265         }{
1266                 {"doesnotexist"},
1267                 {"doesnotexist, readonly"},
1268                 {"readonly"},
1269         } {
1270                 c.Logf("failure case %#v", trial)
1271                 rt.storageClasses = trial.ask
1272                 resp := IssueRequest(s.handler, &rt)
1273                 c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
1274         }
1275 }
1276
1277 func sortCommaSeparated(s string) string {
1278         slice := strings.Split(s, ", ")
1279         sort.Strings(slice)
1280         return strings.Join(slice, ", ")
1281 }
1282
1283 func (s *HandlerSuite) TestPutResponseHeader(c *check.C) {
1284         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
1285
1286         resp := IssueRequest(s.handler, &RequestTester{
1287                 method:      "PUT",
1288                 uri:         "/" + TestHash,
1289                 requestBody: TestBlock,
1290         })
1291         c.Logf("%#v", resp)
1292         c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), check.Equals, "1")
1293         c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), check.Equals, "default=1")
1294 }
1295
1296 func (s *HandlerSuite) TestUntrashHandler(c *check.C) {
1297         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
1298
1299         // Set up Keep volumes
1300         vols := s.handler.volmgr.AllWritable()
1301         vols[0].Put(context.Background(), TestHash, TestBlock)
1302
1303         s.cluster.SystemRootToken = "DATA MANAGER TOKEN"
1304
1305         // unauthenticatedReq => UnauthorizedError
1306         unauthenticatedReq := &RequestTester{
1307                 method: "PUT",
1308                 uri:    "/untrash/" + TestHash,
1309         }
1310         response := IssueRequest(s.handler, unauthenticatedReq)
1311         ExpectStatusCode(c,
1312                 "Unauthenticated request",
1313                 UnauthorizedError.HTTPCode,
1314                 response)
1315
1316         // notDataManagerReq => UnauthorizedError
1317         notDataManagerReq := &RequestTester{
1318                 method:   "PUT",
1319                 uri:      "/untrash/" + TestHash,
1320                 apiToken: knownToken,
1321         }
1322
1323         response = IssueRequest(s.handler, notDataManagerReq)
1324         ExpectStatusCode(c,
1325                 "Non-datamanager token",
1326                 UnauthorizedError.HTTPCode,
1327                 response)
1328
1329         // datamanagerWithBadHashReq => StatusBadRequest
1330         datamanagerWithBadHashReq := &RequestTester{
1331                 method:   "PUT",
1332                 uri:      "/untrash/thisisnotalocator",
1333                 apiToken: s.cluster.SystemRootToken,
1334         }
1335         response = IssueRequest(s.handler, datamanagerWithBadHashReq)
1336         ExpectStatusCode(c,
1337                 "Bad locator in untrash request",
1338                 http.StatusBadRequest,
1339                 response)
1340
1341         // datamanagerWrongMethodReq => StatusBadRequest
1342         datamanagerWrongMethodReq := &RequestTester{
1343                 method:   "GET",
1344                 uri:      "/untrash/" + TestHash,
1345                 apiToken: s.cluster.SystemRootToken,
1346         }
1347         response = IssueRequest(s.handler, datamanagerWrongMethodReq)
1348         ExpectStatusCode(c,
1349                 "Only PUT method is supported for untrash",
1350                 http.StatusMethodNotAllowed,
1351                 response)
1352
1353         // datamanagerReq => StatusOK
1354         datamanagerReq := &RequestTester{
1355                 method:   "PUT",
1356                 uri:      "/untrash/" + TestHash,
1357                 apiToken: s.cluster.SystemRootToken,
1358         }
1359         response = IssueRequest(s.handler, datamanagerReq)
1360         ExpectStatusCode(c,
1361                 "",
1362                 http.StatusOK,
1363                 response)
1364         c.Check(response.Body.String(), check.Equals, "Successfully untrashed on: [MockVolume], [MockVolume]\n")
1365 }
1366
1367 func (s *HandlerSuite) TestUntrashHandlerWithNoWritableVolumes(c *check.C) {
1368         // Change all volumes to read-only
1369         for uuid, v := range s.cluster.Volumes {
1370                 v.ReadOnly = true
1371                 s.cluster.Volumes[uuid] = v
1372         }
1373         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
1374
1375         // datamanagerReq => StatusOK
1376         datamanagerReq := &RequestTester{
1377                 method:   "PUT",
1378                 uri:      "/untrash/" + TestHash,
1379                 apiToken: s.cluster.SystemRootToken,
1380         }
1381         response := IssueRequest(s.handler, datamanagerReq)
1382         ExpectStatusCode(c,
1383                 "No writable volumes",
1384                 http.StatusNotFound,
1385                 response)
1386 }
1387
1388 func (s *HandlerSuite) TestHealthCheckPing(c *check.C) {
1389         s.cluster.ManagementToken = arvadostest.ManagementToken
1390         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
1391         pingReq := &RequestTester{
1392                 method:   "GET",
1393                 uri:      "/_health/ping",
1394                 apiToken: arvadostest.ManagementToken,
1395         }
1396         response := IssueHealthCheckRequest(s.handler, pingReq)
1397         ExpectStatusCode(c,
1398                 "",
1399                 http.StatusOK,
1400                 response)
1401         want := `{"health":"OK"}`
1402         if !strings.Contains(response.Body.String(), want) {
1403                 c.Errorf("expected response to include %s: got %s", want, response.Body.String())
1404         }
1405 }