Merge branch '9865-cwl-fix-ignored-exceptions'
[arvados.git] / services / keepstore / handler_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Tests for Keep HTTP handlers:
6 //
7 //     GetBlockHandler
8 //     PutBlockHandler
9 //     IndexHandler
10 //
11 // The HTTP handlers are responsible for enforcing permission policy,
12 // so these tests must exercise all possible permission permutations.
13
14 package main
15
16 import (
17         "bytes"
18         "context"
19         "encoding/json"
20         "fmt"
21         "net/http"
22         "net/http/httptest"
23         "os"
24         "regexp"
25         "strings"
26         "testing"
27         "time"
28
29         "git.curoverse.com/arvados.git/sdk/go/arvados"
30         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
31 )
32
33 var testCluster = &arvados.Cluster{
34         ClusterID: "zzzzz",
35 }
36
37 // A RequestTester represents the parameters for an HTTP request to
38 // be issued on behalf of a unit test.
39 type RequestTester struct {
40         uri         string
41         apiToken    string
42         method      string
43         requestBody []byte
44 }
45
46 // Test GetBlockHandler on the following situations:
47 //   - permissions off, unauthenticated request, unsigned locator
48 //   - permissions on, authenticated request, signed locator
49 //   - permissions on, authenticated request, unsigned locator
50 //   - permissions on, unauthenticated request, signed locator
51 //   - permissions on, authenticated request, expired locator
52 //   - permissions on, authenticated request, signed locator, transient error from backend
53 //
54 func TestGetHandler(t *testing.T) {
55         defer teardown()
56
57         // Prepare two test Keep volumes. Our block is stored on the second volume.
58         KeepVM = MakeTestVolumeManager(2)
59         defer KeepVM.Close()
60
61         vols := KeepVM.AllWritable()
62         if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
63                 t.Error(err)
64         }
65
66         // Create locators for testing.
67         // Turn on permission settings so we can generate signed locators.
68         theConfig.RequireSignatures = true
69         theConfig.blobSigningKey = []byte(knownKey)
70         theConfig.BlobSignatureTTL.Set("5m")
71
72         var (
73                 unsignedLocator  = "/" + TestHash
74                 validTimestamp   = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
75                 expiredTimestamp = time.Now().Add(-time.Hour)
76                 signedLocator    = "/" + SignLocator(TestHash, knownToken, validTimestamp)
77                 expiredLocator   = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
78         )
79
80         // -----------------
81         // Test unauthenticated request with permissions off.
82         theConfig.RequireSignatures = false
83
84         // Unauthenticated request, unsigned locator
85         // => OK
86         response := IssueRequest(
87                 &RequestTester{
88                         method: "GET",
89                         uri:    unsignedLocator,
90                 })
91         ExpectStatusCode(t,
92                 "Unauthenticated request, unsigned locator", http.StatusOK, response)
93         ExpectBody(t,
94                 "Unauthenticated request, unsigned locator",
95                 string(TestBlock),
96                 response)
97
98         receivedLen := response.Header().Get("Content-Length")
99         expectedLen := fmt.Sprintf("%d", len(TestBlock))
100         if receivedLen != expectedLen {
101                 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
102         }
103
104         // ----------------
105         // Permissions: on.
106         theConfig.RequireSignatures = true
107
108         // Authenticated request, signed locator
109         // => OK
110         response = IssueRequest(&RequestTester{
111                 method:   "GET",
112                 uri:      signedLocator,
113                 apiToken: knownToken,
114         })
115         ExpectStatusCode(t,
116                 "Authenticated request, signed locator", http.StatusOK, response)
117         ExpectBody(t,
118                 "Authenticated request, signed locator", string(TestBlock), response)
119
120         receivedLen = response.Header().Get("Content-Length")
121         expectedLen = fmt.Sprintf("%d", len(TestBlock))
122         if receivedLen != expectedLen {
123                 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
124         }
125
126         // Authenticated request, unsigned locator
127         // => PermissionError
128         response = IssueRequest(&RequestTester{
129                 method:   "GET",
130                 uri:      unsignedLocator,
131                 apiToken: knownToken,
132         })
133         ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response)
134
135         // Unauthenticated request, signed locator
136         // => PermissionError
137         response = IssueRequest(&RequestTester{
138                 method: "GET",
139                 uri:    signedLocator,
140         })
141         ExpectStatusCode(t,
142                 "Unauthenticated request, signed locator",
143                 PermissionError.HTTPCode, response)
144
145         // Authenticated request, expired locator
146         // => ExpiredError
147         response = IssueRequest(&RequestTester{
148                 method:   "GET",
149                 uri:      expiredLocator,
150                 apiToken: knownToken,
151         })
152         ExpectStatusCode(t,
153                 "Authenticated request, expired locator",
154                 ExpiredError.HTTPCode, response)
155
156         // Authenticated request, signed locator
157         // => 503 Server busy (transient error)
158
159         // Set up the block owning volume to respond with errors
160         vols[0].(*MockVolume).Bad = true
161         vols[0].(*MockVolume).BadVolumeError = VolumeBusyError
162         response = IssueRequest(&RequestTester{
163                 method:   "GET",
164                 uri:      signedLocator,
165                 apiToken: knownToken,
166         })
167         // A transient error from one volume while the other doesn't find the block
168         // should make the service return a 503 so that clients can retry.
169         ExpectStatusCode(t,
170                 "Volume backend busy",
171                 503, response)
172 }
173
174 // Test PutBlockHandler on the following situations:
175 //   - no server key
176 //   - with server key, authenticated request, unsigned locator
177 //   - with server key, unauthenticated request, unsigned locator
178 //
179 func TestPutHandler(t *testing.T) {
180         defer teardown()
181
182         // Prepare two test Keep volumes.
183         KeepVM = MakeTestVolumeManager(2)
184         defer KeepVM.Close()
185
186         // --------------
187         // No server key.
188
189         // Unauthenticated request, no server key
190         // => OK (unsigned response)
191         unsignedLocator := "/" + TestHash
192         response := IssueRequest(
193                 &RequestTester{
194                         method:      "PUT",
195                         uri:         unsignedLocator,
196                         requestBody: TestBlock,
197                 })
198
199         ExpectStatusCode(t,
200                 "Unauthenticated request, no server key", http.StatusOK, response)
201         ExpectBody(t,
202                 "Unauthenticated request, no server key",
203                 TestHashPutResp, response)
204
205         // ------------------
206         // With a server key.
207
208         theConfig.blobSigningKey = []byte(knownKey)
209         theConfig.BlobSignatureTTL.Set("5m")
210
211         // When a permission key is available, the locator returned
212         // from an authenticated PUT request will be signed.
213
214         // Authenticated PUT, signed locator
215         // => OK (signed response)
216         response = IssueRequest(
217                 &RequestTester{
218                         method:      "PUT",
219                         uri:         unsignedLocator,
220                         requestBody: TestBlock,
221                         apiToken:    knownToken,
222                 })
223
224         ExpectStatusCode(t,
225                 "Authenticated PUT, signed locator, with server key",
226                 http.StatusOK, response)
227         responseLocator := strings.TrimSpace(response.Body.String())
228         if VerifySignature(responseLocator, knownToken) != nil {
229                 t.Errorf("Authenticated PUT, signed locator, with server key:\n"+
230                         "response '%s' does not contain a valid signature",
231                         responseLocator)
232         }
233
234         // Unauthenticated PUT, unsigned locator
235         // => OK
236         response = IssueRequest(
237                 &RequestTester{
238                         method:      "PUT",
239                         uri:         unsignedLocator,
240                         requestBody: TestBlock,
241                 })
242
243         ExpectStatusCode(t,
244                 "Unauthenticated PUT, unsigned locator, with server key",
245                 http.StatusOK, response)
246         ExpectBody(t,
247                 "Unauthenticated PUT, unsigned locator, with server key",
248                 TestHashPutResp, response)
249 }
250
251 func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
252         defer teardown()
253         theConfig.systemAuthToken = "fake-data-manager-token"
254         vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
255         vols[0].Readonly = true
256         KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
257         defer KeepVM.Close()
258         IssueRequest(
259                 &RequestTester{
260                         method:      "PUT",
261                         uri:         "/" + TestHash,
262                         requestBody: TestBlock,
263                 })
264         defer func(orig bool) {
265                 theConfig.EnableDelete = orig
266         }(theConfig.EnableDelete)
267         theConfig.EnableDelete = true
268         IssueRequest(
269                 &RequestTester{
270                         method:      "DELETE",
271                         uri:         "/" + TestHash,
272                         requestBody: TestBlock,
273                         apiToken:    theConfig.systemAuthToken,
274                 })
275         type expect struct {
276                 volnum    int
277                 method    string
278                 callcount int
279         }
280         for _, e := range []expect{
281                 {0, "Get", 0},
282                 {0, "Compare", 0},
283                 {0, "Touch", 0},
284                 {0, "Put", 0},
285                 {0, "Delete", 0},
286                 {1, "Get", 0},
287                 {1, "Compare", 1},
288                 {1, "Touch", 1},
289                 {1, "Put", 1},
290                 {1, "Delete", 1},
291         } {
292                 if calls := vols[e.volnum].CallCount(e.method); calls != e.callcount {
293                         t.Errorf("Got %d %s() on vol %d, expect %d", calls, e.method, e.volnum, e.callcount)
294                 }
295         }
296 }
297
298 // Test /index requests:
299 //   - unauthenticated /index request
300 //   - unauthenticated /index/prefix request
301 //   - authenticated   /index request        | non-superuser
302 //   - authenticated   /index/prefix request | non-superuser
303 //   - authenticated   /index request        | superuser
304 //   - authenticated   /index/prefix request | superuser
305 //
306 // The only /index requests that should succeed are those issued by the
307 // superuser. They should pass regardless of the value of RequireSignatures.
308 //
309 func TestIndexHandler(t *testing.T) {
310         defer teardown()
311
312         // Set up Keep volumes and populate them.
313         // Include multiple blocks on different volumes, and
314         // some metadata files (which should be omitted from index listings)
315         KeepVM = MakeTestVolumeManager(2)
316         defer KeepVM.Close()
317
318         vols := KeepVM.AllWritable()
319         vols[0].Put(context.Background(), TestHash, TestBlock)
320         vols[1].Put(context.Background(), TestHash2, TestBlock2)
321         vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
322         vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
323
324         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
325
326         unauthenticatedReq := &RequestTester{
327                 method: "GET",
328                 uri:    "/index",
329         }
330         authenticatedReq := &RequestTester{
331                 method:   "GET",
332                 uri:      "/index",
333                 apiToken: knownToken,
334         }
335         superuserReq := &RequestTester{
336                 method:   "GET",
337                 uri:      "/index",
338                 apiToken: theConfig.systemAuthToken,
339         }
340         unauthPrefixReq := &RequestTester{
341                 method: "GET",
342                 uri:    "/index/" + TestHash[0:3],
343         }
344         authPrefixReq := &RequestTester{
345                 method:   "GET",
346                 uri:      "/index/" + TestHash[0:3],
347                 apiToken: knownToken,
348         }
349         superuserPrefixReq := &RequestTester{
350                 method:   "GET",
351                 uri:      "/index/" + TestHash[0:3],
352                 apiToken: theConfig.systemAuthToken,
353         }
354         superuserNoSuchPrefixReq := &RequestTester{
355                 method:   "GET",
356                 uri:      "/index/abcd",
357                 apiToken: theConfig.systemAuthToken,
358         }
359         superuserInvalidPrefixReq := &RequestTester{
360                 method:   "GET",
361                 uri:      "/index/xyz",
362                 apiToken: theConfig.systemAuthToken,
363         }
364
365         // -------------------------------------------------------------
366         // Only the superuser should be allowed to issue /index requests.
367
368         // ---------------------------
369         // RequireSignatures enabled
370         // This setting should not affect tests passing.
371         theConfig.RequireSignatures = true
372
373         // unauthenticated /index request
374         // => UnauthorizedError
375         response := IssueRequest(unauthenticatedReq)
376         ExpectStatusCode(t,
377                 "RequireSignatures on, unauthenticated request",
378                 UnauthorizedError.HTTPCode,
379                 response)
380
381         // unauthenticated /index/prefix request
382         // => UnauthorizedError
383         response = IssueRequest(unauthPrefixReq)
384         ExpectStatusCode(t,
385                 "permissions on, unauthenticated /index/prefix request",
386                 UnauthorizedError.HTTPCode,
387                 response)
388
389         // authenticated /index request, non-superuser
390         // => UnauthorizedError
391         response = IssueRequest(authenticatedReq)
392         ExpectStatusCode(t,
393                 "permissions on, authenticated request, non-superuser",
394                 UnauthorizedError.HTTPCode,
395                 response)
396
397         // authenticated /index/prefix request, non-superuser
398         // => UnauthorizedError
399         response = IssueRequest(authPrefixReq)
400         ExpectStatusCode(t,
401                 "permissions on, authenticated /index/prefix request, non-superuser",
402                 UnauthorizedError.HTTPCode,
403                 response)
404
405         // superuser /index request
406         // => OK
407         response = IssueRequest(superuserReq)
408         ExpectStatusCode(t,
409                 "permissions on, superuser request",
410                 http.StatusOK,
411                 response)
412
413         // ----------------------------
414         // RequireSignatures disabled
415         // Valid Request should still pass.
416         theConfig.RequireSignatures = false
417
418         // superuser /index request
419         // => OK
420         response = IssueRequest(superuserReq)
421         ExpectStatusCode(t,
422                 "permissions on, superuser request",
423                 http.StatusOK,
424                 response)
425
426         expected := `^` + TestHash + `\+\d+ \d+\n` +
427                 TestHash2 + `\+\d+ \d+\n\n$`
428         match, _ := regexp.MatchString(expected, response.Body.String())
429         if !match {
430                 t.Errorf(
431                         "permissions on, superuser request: expected %s, got:\n%s",
432                         expected, response.Body.String())
433         }
434
435         // superuser /index/prefix request
436         // => OK
437         response = IssueRequest(superuserPrefixReq)
438         ExpectStatusCode(t,
439                 "permissions on, superuser request",
440                 http.StatusOK,
441                 response)
442
443         expected = `^` + TestHash + `\+\d+ \d+\n\n$`
444         match, _ = regexp.MatchString(expected, response.Body.String())
445         if !match {
446                 t.Errorf(
447                         "permissions on, superuser /index/prefix request: expected %s, got:\n%s",
448                         expected, response.Body.String())
449         }
450
451         // superuser /index/{no-such-prefix} request
452         // => OK
453         response = IssueRequest(superuserNoSuchPrefixReq)
454         ExpectStatusCode(t,
455                 "permissions on, superuser request",
456                 http.StatusOK,
457                 response)
458
459         if "\n" != response.Body.String() {
460                 t.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String())
461         }
462
463         // superuser /index/{invalid-prefix} request
464         // => StatusBadRequest
465         response = IssueRequest(superuserInvalidPrefixReq)
466         ExpectStatusCode(t,
467                 "permissions on, superuser request",
468                 http.StatusBadRequest,
469                 response)
470 }
471
472 // TestDeleteHandler
473 //
474 // Cases tested:
475 //
476 //   With no token and with a non-data-manager token:
477 //   * Delete existing block
478 //     (test for 403 Forbidden, confirm block not deleted)
479 //
480 //   With data manager token:
481 //
482 //   * Delete existing block
483 //     (test for 200 OK, response counts, confirm block deleted)
484 //
485 //   * Delete nonexistent block
486 //     (test for 200 OK, response counts)
487 //
488 //   TODO(twp):
489 //
490 //   * Delete block on read-only and read-write volume
491 //     (test for 200 OK, response with copies_deleted=1,
492 //     copies_failed=1, confirm block deleted only on r/w volume)
493 //
494 //   * Delete block on read-only volume only
495 //     (test for 200 OK, response with copies_deleted=0, copies_failed=1,
496 //     confirm block not deleted)
497 //
498 func TestDeleteHandler(t *testing.T) {
499         defer teardown()
500
501         // Set up Keep volumes and populate them.
502         // Include multiple blocks on different volumes, and
503         // some metadata files (which should be omitted from index listings)
504         KeepVM = MakeTestVolumeManager(2)
505         defer KeepVM.Close()
506
507         vols := KeepVM.AllWritable()
508         vols[0].Put(context.Background(), TestHash, TestBlock)
509
510         // Explicitly set the BlobSignatureTTL to 0 for these
511         // tests, to ensure the MockVolume deletes the blocks
512         // even though they have just been created.
513         theConfig.BlobSignatureTTL = arvados.Duration(0)
514
515         var userToken = "NOT DATA MANAGER TOKEN"
516         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
517
518         theConfig.EnableDelete = true
519
520         unauthReq := &RequestTester{
521                 method: "DELETE",
522                 uri:    "/" + TestHash,
523         }
524
525         userReq := &RequestTester{
526                 method:   "DELETE",
527                 uri:      "/" + TestHash,
528                 apiToken: userToken,
529         }
530
531         superuserExistingBlockReq := &RequestTester{
532                 method:   "DELETE",
533                 uri:      "/" + TestHash,
534                 apiToken: theConfig.systemAuthToken,
535         }
536
537         superuserNonexistentBlockReq := &RequestTester{
538                 method:   "DELETE",
539                 uri:      "/" + TestHash2,
540                 apiToken: theConfig.systemAuthToken,
541         }
542
543         // Unauthenticated request returns PermissionError.
544         var response *httptest.ResponseRecorder
545         response = IssueRequest(unauthReq)
546         ExpectStatusCode(t,
547                 "unauthenticated request",
548                 PermissionError.HTTPCode,
549                 response)
550
551         // Authenticated non-admin request returns PermissionError.
552         response = IssueRequest(userReq)
553         ExpectStatusCode(t,
554                 "authenticated non-admin request",
555                 PermissionError.HTTPCode,
556                 response)
557
558         // Authenticated admin request for nonexistent block.
559         type deletecounter struct {
560                 Deleted int `json:"copies_deleted"`
561                 Failed  int `json:"copies_failed"`
562         }
563         var responseDc, expectedDc deletecounter
564
565         response = IssueRequest(superuserNonexistentBlockReq)
566         ExpectStatusCode(t,
567                 "data manager request, nonexistent block",
568                 http.StatusNotFound,
569                 response)
570
571         // Authenticated admin request for existing block while EnableDelete is false.
572         theConfig.EnableDelete = false
573         response = IssueRequest(superuserExistingBlockReq)
574         ExpectStatusCode(t,
575                 "authenticated request, existing block, method disabled",
576                 MethodDisabledError.HTTPCode,
577                 response)
578         theConfig.EnableDelete = true
579
580         // Authenticated admin request for existing block.
581         response = IssueRequest(superuserExistingBlockReq)
582         ExpectStatusCode(t,
583                 "data manager request, existing block",
584                 http.StatusOK,
585                 response)
586         // Expect response {"copies_deleted":1,"copies_failed":0}
587         expectedDc = deletecounter{1, 0}
588         json.NewDecoder(response.Body).Decode(&responseDc)
589         if responseDc != expectedDc {
590                 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
591                         expectedDc, responseDc)
592         }
593         // Confirm the block has been deleted
594         buf := make([]byte, BlockSize)
595         _, err := vols[0].Get(context.Background(), TestHash, buf)
596         var blockDeleted = os.IsNotExist(err)
597         if !blockDeleted {
598                 t.Error("superuserExistingBlockReq: block not deleted")
599         }
600
601         // A DELETE request on a block newer than BlobSignatureTTL
602         // should return success but leave the block on the volume.
603         vols[0].Put(context.Background(), TestHash, TestBlock)
604         theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
605
606         response = IssueRequest(superuserExistingBlockReq)
607         ExpectStatusCode(t,
608                 "data manager request, existing block",
609                 http.StatusOK,
610                 response)
611         // Expect response {"copies_deleted":1,"copies_failed":0}
612         expectedDc = deletecounter{1, 0}
613         json.NewDecoder(response.Body).Decode(&responseDc)
614         if responseDc != expectedDc {
615                 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
616                         expectedDc, responseDc)
617         }
618         // Confirm the block has NOT been deleted.
619         _, err = vols[0].Get(context.Background(), TestHash, buf)
620         if err != nil {
621                 t.Errorf("testing delete on new block: %s\n", err)
622         }
623 }
624
625 // TestPullHandler
626 //
627 // Test handling of the PUT /pull statement.
628 //
629 // Cases tested: syntactically valid and invalid pull lists, from the
630 // data manager and from unprivileged users:
631 //
632 //   1. Valid pull list from an ordinary user
633 //      (expected result: 401 Unauthorized)
634 //
635 //   2. Invalid pull request from an ordinary user
636 //      (expected result: 401 Unauthorized)
637 //
638 //   3. Valid pull request from the data manager
639 //      (expected result: 200 OK with request body "Received 3 pull
640 //      requests"
641 //
642 //   4. Invalid pull request from the data manager
643 //      (expected result: 400 Bad Request)
644 //
645 // Test that in the end, the pull manager received a good pull list with
646 // the expected number of requests.
647 //
648 // TODO(twp): test concurrency: launch 100 goroutines to update the
649 // pull list simultaneously.  Make sure that none of them return 400
650 // Bad Request and that pullq.GetList() returns a valid list.
651 //
652 func TestPullHandler(t *testing.T) {
653         defer teardown()
654
655         var userToken = "USER TOKEN"
656         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
657
658         pullq = NewWorkQueue()
659
660         goodJSON := []byte(`[
661                 {
662                         "locator":"locator_with_two_servers",
663                         "servers":[
664                                 "server1",
665                                 "server2"
666                         ]
667                 },
668                 {
669                         "locator":"locator_with_no_servers",
670                         "servers":[]
671                 },
672                 {
673                         "locator":"",
674                         "servers":["empty_locator"]
675                 }
676         ]`)
677
678         badJSON := []byte(`{ "key":"I'm a little teapot" }`)
679
680         type pullTest struct {
681                 name         string
682                 req          RequestTester
683                 responseCode int
684                 responseBody string
685         }
686         var testcases = []pullTest{
687                 {
688                         "Valid pull list from an ordinary user",
689                         RequestTester{"/pull", userToken, "PUT", goodJSON},
690                         http.StatusUnauthorized,
691                         "Unauthorized\n",
692                 },
693                 {
694                         "Invalid pull request from an ordinary user",
695                         RequestTester{"/pull", userToken, "PUT", badJSON},
696                         http.StatusUnauthorized,
697                         "Unauthorized\n",
698                 },
699                 {
700                         "Valid pull request from the data manager",
701                         RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
702                         http.StatusOK,
703                         "Received 3 pull requests\n",
704                 },
705                 {
706                         "Invalid pull request from the data manager",
707                         RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
708                         http.StatusBadRequest,
709                         "",
710                 },
711         }
712
713         for _, tst := range testcases {
714                 response := IssueRequest(&tst.req)
715                 ExpectStatusCode(t, tst.name, tst.responseCode, response)
716                 ExpectBody(t, tst.name, tst.responseBody, response)
717         }
718
719         // The Keep pull manager should have received one good list with 3
720         // requests on it.
721         for i := 0; i < 3; i++ {
722                 item := <-pullq.NextItem
723                 if _, ok := item.(PullRequest); !ok {
724                         t.Errorf("item %v could not be parsed as a PullRequest", item)
725                 }
726         }
727
728         expectChannelEmpty(t, pullq.NextItem)
729 }
730
731 // TestTrashHandler
732 //
733 // Test cases:
734 //
735 // Cases tested: syntactically valid and invalid trash lists, from the
736 // data manager and from unprivileged users:
737 //
738 //   1. Valid trash list from an ordinary user
739 //      (expected result: 401 Unauthorized)
740 //
741 //   2. Invalid trash list from an ordinary user
742 //      (expected result: 401 Unauthorized)
743 //
744 //   3. Valid trash list from the data manager
745 //      (expected result: 200 OK with request body "Received 3 trash
746 //      requests"
747 //
748 //   4. Invalid trash list from the data manager
749 //      (expected result: 400 Bad Request)
750 //
751 // Test that in the end, the trash collector received a good list
752 // trash list with the expected number of requests.
753 //
754 // TODO(twp): test concurrency: launch 100 goroutines to update the
755 // pull list simultaneously.  Make sure that none of them return 400
756 // Bad Request and that replica.Dump() returns a valid list.
757 //
758 func TestTrashHandler(t *testing.T) {
759         defer teardown()
760
761         var userToken = "USER TOKEN"
762         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
763
764         trashq = NewWorkQueue()
765
766         goodJSON := []byte(`[
767                 {
768                         "locator":"block1",
769                         "block_mtime":1409082153
770                 },
771                 {
772                         "locator":"block2",
773                         "block_mtime":1409082153
774                 },
775                 {
776                         "locator":"block3",
777                         "block_mtime":1409082153
778                 }
779         ]`)
780
781         badJSON := []byte(`I am not a valid JSON string`)
782
783         type trashTest struct {
784                 name         string
785                 req          RequestTester
786                 responseCode int
787                 responseBody string
788         }
789
790         var testcases = []trashTest{
791                 {
792                         "Valid trash list from an ordinary user",
793                         RequestTester{"/trash", userToken, "PUT", goodJSON},
794                         http.StatusUnauthorized,
795                         "Unauthorized\n",
796                 },
797                 {
798                         "Invalid trash list from an ordinary user",
799                         RequestTester{"/trash", userToken, "PUT", badJSON},
800                         http.StatusUnauthorized,
801                         "Unauthorized\n",
802                 },
803                 {
804                         "Valid trash list from the data manager",
805                         RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
806                         http.StatusOK,
807                         "Received 3 trash requests\n",
808                 },
809                 {
810                         "Invalid trash list from the data manager",
811                         RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
812                         http.StatusBadRequest,
813                         "",
814                 },
815         }
816
817         for _, tst := range testcases {
818                 response := IssueRequest(&tst.req)
819                 ExpectStatusCode(t, tst.name, tst.responseCode, response)
820                 ExpectBody(t, tst.name, tst.responseBody, response)
821         }
822
823         // The trash collector should have received one good list with 3
824         // requests on it.
825         for i := 0; i < 3; i++ {
826                 item := <-trashq.NextItem
827                 if _, ok := item.(TrashRequest); !ok {
828                         t.Errorf("item %v could not be parsed as a TrashRequest", item)
829                 }
830         }
831
832         expectChannelEmpty(t, trashq.NextItem)
833 }
834
835 // ====================
836 // Helper functions
837 // ====================
838
839 // IssueTestRequest executes an HTTP request described by rt, to a
840 // REST router.  It returns the HTTP response to the request.
841 func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
842         response := httptest.NewRecorder()
843         body := bytes.NewReader(rt.requestBody)
844         req, _ := http.NewRequest(rt.method, rt.uri, body)
845         if rt.apiToken != "" {
846                 req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
847         }
848         loggingRouter := MakeRESTRouter(testCluster)
849         loggingRouter.ServeHTTP(response, req)
850         return response
851 }
852
853 func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
854         response := httptest.NewRecorder()
855         body := bytes.NewReader(rt.requestBody)
856         req, _ := http.NewRequest(rt.method, rt.uri, body)
857         if rt.apiToken != "" {
858                 req.Header.Set("Authorization", "Bearer "+rt.apiToken)
859         }
860         loggingRouter := MakeRESTRouter(testCluster)
861         loggingRouter.ServeHTTP(response, req)
862         return response
863 }
864
865 // ExpectStatusCode checks whether a response has the specified status code,
866 // and reports a test failure if not.
867 func ExpectStatusCode(
868         t *testing.T,
869         testname string,
870         expectedStatus int,
871         response *httptest.ResponseRecorder) {
872         if response.Code != expectedStatus {
873                 t.Errorf("%s: expected status %d, got %+v",
874                         testname, expectedStatus, response)
875         }
876 }
877
878 func ExpectBody(
879         t *testing.T,
880         testname string,
881         expectedBody string,
882         response *httptest.ResponseRecorder) {
883         if expectedBody != "" && response.Body.String() != expectedBody {
884                 t.Errorf("%s: expected response body '%s', got %+v",
885                         testname, expectedBody, response)
886         }
887 }
888
889 // See #7121
890 func TestPutNeedsOnlyOneBuffer(t *testing.T) {
891         defer teardown()
892         KeepVM = MakeTestVolumeManager(1)
893         defer KeepVM.Close()
894
895         defer func(orig *bufferPool) {
896                 bufs = orig
897         }(bufs)
898         bufs = newBufferPool(1, BlockSize)
899
900         ok := make(chan struct{})
901         go func() {
902                 for i := 0; i < 2; i++ {
903                         response := IssueRequest(
904                                 &RequestTester{
905                                         method:      "PUT",
906                                         uri:         "/" + TestHash,
907                                         requestBody: TestBlock,
908                                 })
909                         ExpectStatusCode(t,
910                                 "TestPutNeedsOnlyOneBuffer", http.StatusOK, response)
911                 }
912                 ok <- struct{}{}
913         }()
914
915         select {
916         case <-ok:
917         case <-time.After(time.Second):
918                 t.Fatal("PUT deadlocks with MaxBuffers==1")
919         }
920 }
921
922 // Invoke the PutBlockHandler a bunch of times to test for bufferpool resource
923 // leak.
924 func TestPutHandlerNoBufferleak(t *testing.T) {
925         defer teardown()
926
927         // Prepare two test Keep volumes.
928         KeepVM = MakeTestVolumeManager(2)
929         defer KeepVM.Close()
930
931         ok := make(chan bool)
932         go func() {
933                 for i := 0; i < theConfig.MaxBuffers+1; i++ {
934                         // Unauthenticated request, no server key
935                         // => OK (unsigned response)
936                         unsignedLocator := "/" + TestHash
937                         response := IssueRequest(
938                                 &RequestTester{
939                                         method:      "PUT",
940                                         uri:         unsignedLocator,
941                                         requestBody: TestBlock,
942                                 })
943                         ExpectStatusCode(t,
944                                 "TestPutHandlerBufferleak", http.StatusOK, response)
945                         ExpectBody(t,
946                                 "TestPutHandlerBufferleak",
947                                 TestHashPutResp, response)
948                 }
949                 ok <- true
950         }()
951         select {
952         case <-time.After(20 * time.Second):
953                 // If the buffer pool leaks, the test goroutine hangs.
954                 t.Fatal("test did not finish, assuming pool leaked")
955         case <-ok:
956         }
957 }
958
959 type notifyingResponseRecorder struct {
960         *httptest.ResponseRecorder
961         closer chan bool
962 }
963
964 func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
965         return r.closer
966 }
967
968 func TestGetHandlerClientDisconnect(t *testing.T) {
969         defer func(was bool) {
970                 theConfig.RequireSignatures = was
971         }(theConfig.RequireSignatures)
972         theConfig.RequireSignatures = false
973
974         defer func(orig *bufferPool) {
975                 bufs = orig
976         }(bufs)
977         bufs = newBufferPool(1, BlockSize)
978         defer bufs.Put(bufs.Get(BlockSize))
979
980         KeepVM = MakeTestVolumeManager(2)
981         defer KeepVM.Close()
982
983         if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
984                 t.Error(err)
985         }
986
987         resp := &notifyingResponseRecorder{
988                 ResponseRecorder: httptest.NewRecorder(),
989                 closer:           make(chan bool, 1),
990         }
991         if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
992                 t.Fatal("notifyingResponseRecorder is broken")
993         }
994         // If anyone asks, the client has disconnected.
995         resp.closer <- true
996
997         ok := make(chan struct{})
998         go func() {
999                 req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
1000                 MakeRESTRouter(testCluster).ServeHTTP(resp, req)
1001                 ok <- struct{}{}
1002         }()
1003
1004         select {
1005         case <-time.After(20 * time.Second):
1006                 t.Fatal("request took >20s, close notifier must be broken")
1007         case <-ok:
1008         }
1009
1010         ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
1011         for i, v := range KeepVM.AllWritable() {
1012                 if calls := v.(*MockVolume).called["GET"]; calls != 0 {
1013                         t.Errorf("volume %d got %d calls, expected 0", i, calls)
1014                 }
1015         }
1016 }
1017
1018 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
1019 // leak.
1020 func TestGetHandlerNoBufferLeak(t *testing.T) {
1021         defer teardown()
1022
1023         // Prepare two test Keep volumes. Our block is stored on the second volume.
1024         KeepVM = MakeTestVolumeManager(2)
1025         defer KeepVM.Close()
1026
1027         vols := KeepVM.AllWritable()
1028         if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
1029                 t.Error(err)
1030         }
1031
1032         ok := make(chan bool)
1033         go func() {
1034                 for i := 0; i < theConfig.MaxBuffers+1; i++ {
1035                         // Unauthenticated request, unsigned locator
1036                         // => OK
1037                         unsignedLocator := "/" + TestHash
1038                         response := IssueRequest(
1039                                 &RequestTester{
1040                                         method: "GET",
1041                                         uri:    unsignedLocator,
1042                                 })
1043                         ExpectStatusCode(t,
1044                                 "Unauthenticated request, unsigned locator", http.StatusOK, response)
1045                         ExpectBody(t,
1046                                 "Unauthenticated request, unsigned locator",
1047                                 string(TestBlock),
1048                                 response)
1049                 }
1050                 ok <- true
1051         }()
1052         select {
1053         case <-time.After(20 * time.Second):
1054                 // If the buffer pool leaks, the test goroutine hangs.
1055                 t.Fatal("test did not finish, assuming pool leaked")
1056         case <-ok:
1057         }
1058 }
1059
1060 func TestPutReplicationHeader(t *testing.T) {
1061         defer teardown()
1062
1063         KeepVM = MakeTestVolumeManager(2)
1064         defer KeepVM.Close()
1065
1066         resp := IssueRequest(&RequestTester{
1067                 method:      "PUT",
1068                 uri:         "/" + TestHash,
1069                 requestBody: TestBlock,
1070         })
1071         if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
1072                 t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
1073         }
1074 }
1075
1076 func TestUntrashHandler(t *testing.T) {
1077         defer teardown()
1078
1079         // Set up Keep volumes
1080         KeepVM = MakeTestVolumeManager(2)
1081         defer KeepVM.Close()
1082         vols := KeepVM.AllWritable()
1083         vols[0].Put(context.Background(), TestHash, TestBlock)
1084
1085         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1086
1087         // unauthenticatedReq => UnauthorizedError
1088         unauthenticatedReq := &RequestTester{
1089                 method: "PUT",
1090                 uri:    "/untrash/" + TestHash,
1091         }
1092         response := IssueRequest(unauthenticatedReq)
1093         ExpectStatusCode(t,
1094                 "Unauthenticated request",
1095                 UnauthorizedError.HTTPCode,
1096                 response)
1097
1098         // notDataManagerReq => UnauthorizedError
1099         notDataManagerReq := &RequestTester{
1100                 method:   "PUT",
1101                 uri:      "/untrash/" + TestHash,
1102                 apiToken: knownToken,
1103         }
1104
1105         response = IssueRequest(notDataManagerReq)
1106         ExpectStatusCode(t,
1107                 "Non-datamanager token",
1108                 UnauthorizedError.HTTPCode,
1109                 response)
1110
1111         // datamanagerWithBadHashReq => StatusBadRequest
1112         datamanagerWithBadHashReq := &RequestTester{
1113                 method:   "PUT",
1114                 uri:      "/untrash/thisisnotalocator",
1115                 apiToken: theConfig.systemAuthToken,
1116         }
1117         response = IssueRequest(datamanagerWithBadHashReq)
1118         ExpectStatusCode(t,
1119                 "Bad locator in untrash request",
1120                 http.StatusBadRequest,
1121                 response)
1122
1123         // datamanagerWrongMethodReq => StatusBadRequest
1124         datamanagerWrongMethodReq := &RequestTester{
1125                 method:   "GET",
1126                 uri:      "/untrash/" + TestHash,
1127                 apiToken: theConfig.systemAuthToken,
1128         }
1129         response = IssueRequest(datamanagerWrongMethodReq)
1130         ExpectStatusCode(t,
1131                 "Only PUT method is supported for untrash",
1132                 http.StatusMethodNotAllowed,
1133                 response)
1134
1135         // datamanagerReq => StatusOK
1136         datamanagerReq := &RequestTester{
1137                 method:   "PUT",
1138                 uri:      "/untrash/" + TestHash,
1139                 apiToken: theConfig.systemAuthToken,
1140         }
1141         response = IssueRequest(datamanagerReq)
1142         ExpectStatusCode(t,
1143                 "",
1144                 http.StatusOK,
1145                 response)
1146         expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
1147         if response.Body.String() != expected {
1148                 t.Errorf(
1149                         "Untrash response mismatched: expected %s, got:\n%s",
1150                         expected, response.Body.String())
1151         }
1152 }
1153
1154 func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
1155         defer teardown()
1156
1157         // Set up readonly Keep volumes
1158         vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
1159         vols[0].Readonly = true
1160         vols[1].Readonly = true
1161         KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
1162         defer KeepVM.Close()
1163
1164         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1165
1166         // datamanagerReq => StatusOK
1167         datamanagerReq := &RequestTester{
1168                 method:   "PUT",
1169                 uri:      "/untrash/" + TestHash,
1170                 apiToken: theConfig.systemAuthToken,
1171         }
1172         response := IssueRequest(datamanagerReq)
1173         ExpectStatusCode(t,
1174                 "No writable volumes",
1175                 http.StatusNotFound,
1176                 response)
1177 }
1178
1179 func TestHealthCheckPing(t *testing.T) {
1180         theConfig.ManagementToken = arvadostest.ManagementToken
1181         pingReq := &RequestTester{
1182                 method:   "GET",
1183                 uri:      "/_health/ping",
1184                 apiToken: arvadostest.ManagementToken,
1185         }
1186         response := IssueHealthCheckRequest(pingReq)
1187         ExpectStatusCode(t,
1188                 "",
1189                 http.StatusOK,
1190                 response)
1191         want := `{"health":"OK"}`
1192         if !strings.Contains(response.Body.String(), want) {
1193                 t.Errorf("expected response to include %s: got %s", want, response.Body.String())
1194         }
1195 }