1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Tests for Keep HTTP handlers:
11 // The HTTP handlers are responsible for enforcing permission policy,
12 // so these tests must exercise all possible permission permutations.
29 "git.curoverse.com/arvados.git/sdk/go/arvados"
32 // A RequestTester represents the parameters for an HTTP request to
33 // be issued on behalf of a unit test.
34 type RequestTester struct {
41 // Test GetBlockHandler on the following situations:
42 // - permissions off, unauthenticated request, unsigned locator
43 // - permissions on, authenticated request, signed locator
44 // - permissions on, authenticated request, unsigned locator
45 // - permissions on, unauthenticated request, signed locator
46 // - permissions on, authenticated request, expired locator
48 func TestGetHandler(t *testing.T) {
51 // Prepare two test Keep volumes. Our block is stored on the second volume.
52 KeepVM = MakeTestVolumeManager(2)
55 vols := KeepVM.AllWritable()
56 if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
60 // Create locators for testing.
61 // Turn on permission settings so we can generate signed locators.
62 theConfig.RequireSignatures = true
63 theConfig.blobSigningKey = []byte(knownKey)
64 theConfig.BlobSignatureTTL.Set("5m")
67 unsignedLocator = "/" + TestHash
68 validTimestamp = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
69 expiredTimestamp = time.Now().Add(-time.Hour)
70 signedLocator = "/" + SignLocator(TestHash, knownToken, validTimestamp)
71 expiredLocator = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
75 // Test unauthenticated request with permissions off.
76 theConfig.RequireSignatures = false
78 // Unauthenticated request, unsigned locator
80 response := IssueRequest(
86 "Unauthenticated request, unsigned locator", http.StatusOK, response)
88 "Unauthenticated request, unsigned locator",
92 receivedLen := response.Header().Get("Content-Length")
93 expectedLen := fmt.Sprintf("%d", len(TestBlock))
94 if receivedLen != expectedLen {
95 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
100 theConfig.RequireSignatures = true
102 // Authenticated request, signed locator
104 response = IssueRequest(&RequestTester{
107 apiToken: knownToken,
110 "Authenticated request, signed locator", http.StatusOK, response)
112 "Authenticated request, signed locator", string(TestBlock), response)
114 receivedLen = response.Header().Get("Content-Length")
115 expectedLen = fmt.Sprintf("%d", len(TestBlock))
116 if receivedLen != expectedLen {
117 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
120 // Authenticated request, unsigned locator
121 // => PermissionError
122 response = IssueRequest(&RequestTester{
124 uri: unsignedLocator,
125 apiToken: knownToken,
127 ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response)
129 // Unauthenticated request, signed locator
130 // => PermissionError
131 response = IssueRequest(&RequestTester{
136 "Unauthenticated request, signed locator",
137 PermissionError.HTTPCode, response)
139 // Authenticated request, expired locator
141 response = IssueRequest(&RequestTester{
144 apiToken: knownToken,
147 "Authenticated request, expired locator",
148 ExpiredError.HTTPCode, response)
151 // Test PutBlockHandler on the following situations:
153 // - with server key, authenticated request, unsigned locator
154 // - with server key, unauthenticated request, unsigned locator
156 func TestPutHandler(t *testing.T) {
159 // Prepare two test Keep volumes.
160 KeepVM = MakeTestVolumeManager(2)
166 // Unauthenticated request, no server key
167 // => OK (unsigned response)
168 unsignedLocator := "/" + TestHash
169 response := IssueRequest(
172 uri: unsignedLocator,
173 requestBody: TestBlock,
177 "Unauthenticated request, no server key", http.StatusOK, response)
179 "Unauthenticated request, no server key",
180 TestHashPutResp, response)
182 // ------------------
183 // With a server key.
185 theConfig.blobSigningKey = []byte(knownKey)
186 theConfig.BlobSignatureTTL.Set("5m")
188 // When a permission key is available, the locator returned
189 // from an authenticated PUT request will be signed.
191 // Authenticated PUT, signed locator
192 // => OK (signed response)
193 response = IssueRequest(
196 uri: unsignedLocator,
197 requestBody: TestBlock,
198 apiToken: knownToken,
202 "Authenticated PUT, signed locator, with server key",
203 http.StatusOK, response)
204 responseLocator := strings.TrimSpace(response.Body.String())
205 if VerifySignature(responseLocator, knownToken) != nil {
206 t.Errorf("Authenticated PUT, signed locator, with server key:\n"+
207 "response '%s' does not contain a valid signature",
211 // Unauthenticated PUT, unsigned locator
213 response = IssueRequest(
216 uri: unsignedLocator,
217 requestBody: TestBlock,
221 "Unauthenticated PUT, unsigned locator, with server key",
222 http.StatusOK, response)
224 "Unauthenticated PUT, unsigned locator, with server key",
225 TestHashPutResp, response)
228 func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
230 theConfig.systemAuthToken = "fake-data-manager-token"
231 vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
232 vols[0].Readonly = true
233 KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
239 requestBody: TestBlock,
241 defer func(orig bool) {
242 theConfig.EnableDelete = orig
243 }(theConfig.EnableDelete)
244 theConfig.EnableDelete = true
249 requestBody: TestBlock,
250 apiToken: theConfig.systemAuthToken,
257 for _, e := range []expect{
269 if calls := vols[e.volnum].CallCount(e.method); calls != e.callcount {
270 t.Errorf("Got %d %s() on vol %d, expect %d", calls, e.method, e.volnum, e.callcount)
275 // Test /index requests:
276 // - unauthenticated /index request
277 // - unauthenticated /index/prefix request
278 // - authenticated /index request | non-superuser
279 // - authenticated /index/prefix request | non-superuser
280 // - authenticated /index request | superuser
281 // - authenticated /index/prefix request | superuser
283 // The only /index requests that should succeed are those issued by the
284 // superuser. They should pass regardless of the value of RequireSignatures.
286 func TestIndexHandler(t *testing.T) {
289 // Set up Keep volumes and populate them.
290 // Include multiple blocks on different volumes, and
291 // some metadata files (which should be omitted from index listings)
292 KeepVM = MakeTestVolumeManager(2)
295 vols := KeepVM.AllWritable()
296 vols[0].Put(context.Background(), TestHash, TestBlock)
297 vols[1].Put(context.Background(), TestHash2, TestBlock2)
298 vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
299 vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
301 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
303 unauthenticatedReq := &RequestTester{
307 authenticatedReq := &RequestTester{
310 apiToken: knownToken,
312 superuserReq := &RequestTester{
315 apiToken: theConfig.systemAuthToken,
317 unauthPrefixReq := &RequestTester{
319 uri: "/index/" + TestHash[0:3],
321 authPrefixReq := &RequestTester{
323 uri: "/index/" + TestHash[0:3],
324 apiToken: knownToken,
326 superuserPrefixReq := &RequestTester{
328 uri: "/index/" + TestHash[0:3],
329 apiToken: theConfig.systemAuthToken,
331 superuserNoSuchPrefixReq := &RequestTester{
334 apiToken: theConfig.systemAuthToken,
336 superuserInvalidPrefixReq := &RequestTester{
339 apiToken: theConfig.systemAuthToken,
342 // -------------------------------------------------------------
343 // Only the superuser should be allowed to issue /index requests.
345 // ---------------------------
346 // RequireSignatures enabled
347 // This setting should not affect tests passing.
348 theConfig.RequireSignatures = true
350 // unauthenticated /index request
351 // => UnauthorizedError
352 response := IssueRequest(unauthenticatedReq)
354 "RequireSignatures on, unauthenticated request",
355 UnauthorizedError.HTTPCode,
358 // unauthenticated /index/prefix request
359 // => UnauthorizedError
360 response = IssueRequest(unauthPrefixReq)
362 "permissions on, unauthenticated /index/prefix request",
363 UnauthorizedError.HTTPCode,
366 // authenticated /index request, non-superuser
367 // => UnauthorizedError
368 response = IssueRequest(authenticatedReq)
370 "permissions on, authenticated request, non-superuser",
371 UnauthorizedError.HTTPCode,
374 // authenticated /index/prefix request, non-superuser
375 // => UnauthorizedError
376 response = IssueRequest(authPrefixReq)
378 "permissions on, authenticated /index/prefix request, non-superuser",
379 UnauthorizedError.HTTPCode,
382 // superuser /index request
384 response = IssueRequest(superuserReq)
386 "permissions on, superuser request",
390 // ----------------------------
391 // RequireSignatures disabled
392 // Valid Request should still pass.
393 theConfig.RequireSignatures = false
395 // superuser /index request
397 response = IssueRequest(superuserReq)
399 "permissions on, superuser request",
403 expected := `^` + TestHash + `\+\d+ \d+\n` +
404 TestHash2 + `\+\d+ \d+\n\n$`
405 match, _ := regexp.MatchString(expected, response.Body.String())
408 "permissions on, superuser request: expected %s, got:\n%s",
409 expected, response.Body.String())
412 // superuser /index/prefix request
414 response = IssueRequest(superuserPrefixReq)
416 "permissions on, superuser request",
420 expected = `^` + TestHash + `\+\d+ \d+\n\n$`
421 match, _ = regexp.MatchString(expected, response.Body.String())
424 "permissions on, superuser /index/prefix request: expected %s, got:\n%s",
425 expected, response.Body.String())
428 // superuser /index/{no-such-prefix} request
430 response = IssueRequest(superuserNoSuchPrefixReq)
432 "permissions on, superuser request",
436 if "\n" != response.Body.String() {
437 t.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String())
440 // superuser /index/{invalid-prefix} request
441 // => StatusBadRequest
442 response = IssueRequest(superuserInvalidPrefixReq)
444 "permissions on, superuser request",
445 http.StatusBadRequest,
453 // With no token and with a non-data-manager token:
454 // * Delete existing block
455 // (test for 403 Forbidden, confirm block not deleted)
457 // With data manager token:
459 // * Delete existing block
460 // (test for 200 OK, response counts, confirm block deleted)
462 // * Delete nonexistent block
463 // (test for 200 OK, response counts)
467 // * Delete block on read-only and read-write volume
468 // (test for 200 OK, response with copies_deleted=1,
469 // copies_failed=1, confirm block deleted only on r/w volume)
471 // * Delete block on read-only volume only
472 // (test for 200 OK, response with copies_deleted=0, copies_failed=1,
473 // confirm block not deleted)
475 func TestDeleteHandler(t *testing.T) {
478 // Set up Keep volumes and populate them.
479 // Include multiple blocks on different volumes, and
480 // some metadata files (which should be omitted from index listings)
481 KeepVM = MakeTestVolumeManager(2)
484 vols := KeepVM.AllWritable()
485 vols[0].Put(context.Background(), TestHash, TestBlock)
487 // Explicitly set the BlobSignatureTTL to 0 for these
488 // tests, to ensure the MockVolume deletes the blocks
489 // even though they have just been created.
490 theConfig.BlobSignatureTTL = arvados.Duration(0)
492 var userToken = "NOT DATA MANAGER TOKEN"
493 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
495 theConfig.EnableDelete = true
497 unauthReq := &RequestTester{
502 userReq := &RequestTester{
508 superuserExistingBlockReq := &RequestTester{
511 apiToken: theConfig.systemAuthToken,
514 superuserNonexistentBlockReq := &RequestTester{
516 uri: "/" + TestHash2,
517 apiToken: theConfig.systemAuthToken,
520 // Unauthenticated request returns PermissionError.
521 var response *httptest.ResponseRecorder
522 response = IssueRequest(unauthReq)
524 "unauthenticated request",
525 PermissionError.HTTPCode,
528 // Authenticated non-admin request returns PermissionError.
529 response = IssueRequest(userReq)
531 "authenticated non-admin request",
532 PermissionError.HTTPCode,
535 // Authenticated admin request for nonexistent block.
536 type deletecounter struct {
537 Deleted int `json:"copies_deleted"`
538 Failed int `json:"copies_failed"`
540 var responseDc, expectedDc deletecounter
542 response = IssueRequest(superuserNonexistentBlockReq)
544 "data manager request, nonexistent block",
548 // Authenticated admin request for existing block while EnableDelete is false.
549 theConfig.EnableDelete = false
550 response = IssueRequest(superuserExistingBlockReq)
552 "authenticated request, existing block, method disabled",
553 MethodDisabledError.HTTPCode,
555 theConfig.EnableDelete = true
557 // Authenticated admin request for existing block.
558 response = IssueRequest(superuserExistingBlockReq)
560 "data manager request, existing block",
563 // Expect response {"copies_deleted":1,"copies_failed":0}
564 expectedDc = deletecounter{1, 0}
565 json.NewDecoder(response.Body).Decode(&responseDc)
566 if responseDc != expectedDc {
567 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
568 expectedDc, responseDc)
570 // Confirm the block has been deleted
571 buf := make([]byte, BlockSize)
572 _, err := vols[0].Get(context.Background(), TestHash, buf)
573 var blockDeleted = os.IsNotExist(err)
575 t.Error("superuserExistingBlockReq: block not deleted")
578 // A DELETE request on a block newer than BlobSignatureTTL
579 // should return success but leave the block on the volume.
580 vols[0].Put(context.Background(), TestHash, TestBlock)
581 theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
583 response = IssueRequest(superuserExistingBlockReq)
585 "data manager request, existing block",
588 // Expect response {"copies_deleted":1,"copies_failed":0}
589 expectedDc = deletecounter{1, 0}
590 json.NewDecoder(response.Body).Decode(&responseDc)
591 if responseDc != expectedDc {
592 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
593 expectedDc, responseDc)
595 // Confirm the block has NOT been deleted.
596 _, err = vols[0].Get(context.Background(), TestHash, buf)
598 t.Errorf("testing delete on new block: %s\n", err)
604 // Test handling of the PUT /pull statement.
606 // Cases tested: syntactically valid and invalid pull lists, from the
607 // data manager and from unprivileged users:
609 // 1. Valid pull list from an ordinary user
610 // (expected result: 401 Unauthorized)
612 // 2. Invalid pull request from an ordinary user
613 // (expected result: 401 Unauthorized)
615 // 3. Valid pull request from the data manager
616 // (expected result: 200 OK with request body "Received 3 pull
619 // 4. Invalid pull request from the data manager
620 // (expected result: 400 Bad Request)
622 // Test that in the end, the pull manager received a good pull list with
623 // the expected number of requests.
625 // TODO(twp): test concurrency: launch 100 goroutines to update the
626 // pull list simultaneously. Make sure that none of them return 400
627 // Bad Request and that pullq.GetList() returns a valid list.
629 func TestPullHandler(t *testing.T) {
632 var userToken = "USER TOKEN"
633 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
635 pullq = NewWorkQueue()
637 goodJSON := []byte(`[
639 "locator":"locator_with_two_servers",
646 "locator":"locator_with_no_servers",
651 "servers":["empty_locator"]
655 badJSON := []byte(`{ "key":"I'm a little teapot" }`)
657 type pullTest struct {
663 var testcases = []pullTest{
665 "Valid pull list from an ordinary user",
666 RequestTester{"/pull", userToken, "PUT", goodJSON},
667 http.StatusUnauthorized,
671 "Invalid pull request from an ordinary user",
672 RequestTester{"/pull", userToken, "PUT", badJSON},
673 http.StatusUnauthorized,
677 "Valid pull request from the data manager",
678 RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
680 "Received 3 pull requests\n",
683 "Invalid pull request from the data manager",
684 RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
685 http.StatusBadRequest,
690 for _, tst := range testcases {
691 response := IssueRequest(&tst.req)
692 ExpectStatusCode(t, tst.name, tst.responseCode, response)
693 ExpectBody(t, tst.name, tst.responseBody, response)
696 // The Keep pull manager should have received one good list with 3
698 for i := 0; i < 3; i++ {
699 item := <-pullq.NextItem
700 if _, ok := item.(PullRequest); !ok {
701 t.Errorf("item %v could not be parsed as a PullRequest", item)
705 expectChannelEmpty(t, pullq.NextItem)
712 // Cases tested: syntactically valid and invalid trash lists, from the
713 // data manager and from unprivileged users:
715 // 1. Valid trash list from an ordinary user
716 // (expected result: 401 Unauthorized)
718 // 2. Invalid trash list from an ordinary user
719 // (expected result: 401 Unauthorized)
721 // 3. Valid trash list from the data manager
722 // (expected result: 200 OK with request body "Received 3 trash
725 // 4. Invalid trash list from the data manager
726 // (expected result: 400 Bad Request)
728 // Test that in the end, the trash collector received a good list
729 // trash list with the expected number of requests.
731 // TODO(twp): test concurrency: launch 100 goroutines to update the
732 // pull list simultaneously. Make sure that none of them return 400
733 // Bad Request and that replica.Dump() returns a valid list.
735 func TestTrashHandler(t *testing.T) {
738 var userToken = "USER TOKEN"
739 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
741 trashq = NewWorkQueue()
743 goodJSON := []byte(`[
746 "block_mtime":1409082153
750 "block_mtime":1409082153
754 "block_mtime":1409082153
758 badJSON := []byte(`I am not a valid JSON string`)
760 type trashTest struct {
767 var testcases = []trashTest{
769 "Valid trash list from an ordinary user",
770 RequestTester{"/trash", userToken, "PUT", goodJSON},
771 http.StatusUnauthorized,
775 "Invalid trash list from an ordinary user",
776 RequestTester{"/trash", userToken, "PUT", badJSON},
777 http.StatusUnauthorized,
781 "Valid trash list from the data manager",
782 RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
784 "Received 3 trash requests\n",
787 "Invalid trash list from the data manager",
788 RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
789 http.StatusBadRequest,
794 for _, tst := range testcases {
795 response := IssueRequest(&tst.req)
796 ExpectStatusCode(t, tst.name, tst.responseCode, response)
797 ExpectBody(t, tst.name, tst.responseBody, response)
800 // The trash collector should have received one good list with 3
802 for i := 0; i < 3; i++ {
803 item := <-trashq.NextItem
804 if _, ok := item.(TrashRequest); !ok {
805 t.Errorf("item %v could not be parsed as a TrashRequest", item)
809 expectChannelEmpty(t, trashq.NextItem)
812 // ====================
814 // ====================
816 // IssueTestRequest executes an HTTP request described by rt, to a
817 // REST router. It returns the HTTP response to the request.
818 func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
819 response := httptest.NewRecorder()
820 body := bytes.NewReader(rt.requestBody)
821 req, _ := http.NewRequest(rt.method, rt.uri, body)
822 if rt.apiToken != "" {
823 req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
825 loggingRouter := MakeRESTRouter()
826 loggingRouter.ServeHTTP(response, req)
830 // ExpectStatusCode checks whether a response has the specified status code,
831 // and reports a test failure if not.
832 func ExpectStatusCode(
836 response *httptest.ResponseRecorder) {
837 if response.Code != expectedStatus {
838 t.Errorf("%s: expected status %d, got %+v",
839 testname, expectedStatus, response)
847 response *httptest.ResponseRecorder) {
848 if expectedBody != "" && response.Body.String() != expectedBody {
849 t.Errorf("%s: expected response body '%s', got %+v",
850 testname, expectedBody, response)
855 func TestPutNeedsOnlyOneBuffer(t *testing.T) {
857 KeepVM = MakeTestVolumeManager(1)
860 defer func(orig *bufferPool) {
863 bufs = newBufferPool(1, BlockSize)
865 ok := make(chan struct{})
867 for i := 0; i < 2; i++ {
868 response := IssueRequest(
872 requestBody: TestBlock,
875 "TestPutNeedsOnlyOneBuffer", http.StatusOK, response)
882 case <-time.After(time.Second):
883 t.Fatal("PUT deadlocks with MaxBuffers==1")
887 // Invoke the PutBlockHandler a bunch of times to test for bufferpool resource
889 func TestPutHandlerNoBufferleak(t *testing.T) {
892 // Prepare two test Keep volumes.
893 KeepVM = MakeTestVolumeManager(2)
896 ok := make(chan bool)
898 for i := 0; i < theConfig.MaxBuffers+1; i++ {
899 // Unauthenticated request, no server key
900 // => OK (unsigned response)
901 unsignedLocator := "/" + TestHash
902 response := IssueRequest(
905 uri: unsignedLocator,
906 requestBody: TestBlock,
909 "TestPutHandlerBufferleak", http.StatusOK, response)
911 "TestPutHandlerBufferleak",
912 TestHashPutResp, response)
917 case <-time.After(20 * time.Second):
918 // If the buffer pool leaks, the test goroutine hangs.
919 t.Fatal("test did not finish, assuming pool leaked")
924 type notifyingResponseRecorder struct {
925 *httptest.ResponseRecorder
929 func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
933 func TestGetHandlerClientDisconnect(t *testing.T) {
934 defer func(was bool) {
935 theConfig.RequireSignatures = was
936 }(theConfig.RequireSignatures)
937 theConfig.RequireSignatures = false
939 defer func(orig *bufferPool) {
942 bufs = newBufferPool(1, BlockSize)
943 defer bufs.Put(bufs.Get(BlockSize))
945 KeepVM = MakeTestVolumeManager(2)
948 if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
952 resp := ¬ifyingResponseRecorder{
953 ResponseRecorder: httptest.NewRecorder(),
954 closer: make(chan bool, 1),
956 if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
957 t.Fatal("notifyingResponseRecorder is broken")
959 // If anyone asks, the client has disconnected.
962 ok := make(chan struct{})
964 req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
965 (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
970 case <-time.After(20 * time.Second):
971 t.Fatal("request took >20s, close notifier must be broken")
975 ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
976 for i, v := range KeepVM.AllWritable() {
977 if calls := v.(*MockVolume).called["GET"]; calls != 0 {
978 t.Errorf("volume %d got %d calls, expected 0", i, calls)
983 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
985 func TestGetHandlerNoBufferLeak(t *testing.T) {
988 // Prepare two test Keep volumes. Our block is stored on the second volume.
989 KeepVM = MakeTestVolumeManager(2)
992 vols := KeepVM.AllWritable()
993 if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
997 ok := make(chan bool)
999 for i := 0; i < theConfig.MaxBuffers+1; i++ {
1000 // Unauthenticated request, unsigned locator
1002 unsignedLocator := "/" + TestHash
1003 response := IssueRequest(
1006 uri: unsignedLocator,
1009 "Unauthenticated request, unsigned locator", http.StatusOK, response)
1011 "Unauthenticated request, unsigned locator",
1018 case <-time.After(20 * time.Second):
1019 // If the buffer pool leaks, the test goroutine hangs.
1020 t.Fatal("test did not finish, assuming pool leaked")
1025 func TestPutReplicationHeader(t *testing.T) {
1028 KeepVM = MakeTestVolumeManager(2)
1029 defer KeepVM.Close()
1031 resp := IssueRequest(&RequestTester{
1033 uri: "/" + TestHash,
1034 requestBody: TestBlock,
1036 if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
1037 t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
1041 func TestUntrashHandler(t *testing.T) {
1044 // Set up Keep volumes
1045 KeepVM = MakeTestVolumeManager(2)
1046 defer KeepVM.Close()
1047 vols := KeepVM.AllWritable()
1048 vols[0].Put(context.Background(), TestHash, TestBlock)
1050 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1052 // unauthenticatedReq => UnauthorizedError
1053 unauthenticatedReq := &RequestTester{
1055 uri: "/untrash/" + TestHash,
1057 response := IssueRequest(unauthenticatedReq)
1059 "Unauthenticated request",
1060 UnauthorizedError.HTTPCode,
1063 // notDataManagerReq => UnauthorizedError
1064 notDataManagerReq := &RequestTester{
1066 uri: "/untrash/" + TestHash,
1067 apiToken: knownToken,
1070 response = IssueRequest(notDataManagerReq)
1072 "Non-datamanager token",
1073 UnauthorizedError.HTTPCode,
1076 // datamanagerWithBadHashReq => StatusBadRequest
1077 datamanagerWithBadHashReq := &RequestTester{
1079 uri: "/untrash/thisisnotalocator",
1080 apiToken: theConfig.systemAuthToken,
1082 response = IssueRequest(datamanagerWithBadHashReq)
1084 "Bad locator in untrash request",
1085 http.StatusBadRequest,
1088 // datamanagerWrongMethodReq => StatusBadRequest
1089 datamanagerWrongMethodReq := &RequestTester{
1091 uri: "/untrash/" + TestHash,
1092 apiToken: theConfig.systemAuthToken,
1094 response = IssueRequest(datamanagerWrongMethodReq)
1096 "Only PUT method is supported for untrash",
1097 http.StatusBadRequest,
1100 // datamanagerReq => StatusOK
1101 datamanagerReq := &RequestTester{
1103 uri: "/untrash/" + TestHash,
1104 apiToken: theConfig.systemAuthToken,
1106 response = IssueRequest(datamanagerReq)
1111 expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
1112 if response.Body.String() != expected {
1114 "Untrash response mismatched: expected %s, got:\n%s",
1115 expected, response.Body.String())
1119 func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
1122 // Set up readonly Keep volumes
1123 vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
1124 vols[0].Readonly = true
1125 vols[1].Readonly = true
1126 KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
1127 defer KeepVM.Close()
1129 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1131 // datamanagerReq => StatusOK
1132 datamanagerReq := &RequestTester{
1134 uri: "/untrash/" + TestHash,
1135 apiToken: theConfig.systemAuthToken,
1137 response := IssueRequest(datamanagerReq)
1139 "No writable volumes",
1140 http.StatusNotFound,