1 // Tests for Keep HTTP handlers:
7 // The HTTP handlers are responsible for enforcing permission policy,
8 // so these tests must exercise all possible permission permutations.
25 "git.curoverse.com/arvados.git/sdk/go/arvados"
28 // A RequestTester represents the parameters for an HTTP request to
29 // be issued on behalf of a unit test.
30 type RequestTester struct {
37 // Test GetBlockHandler on the following situations:
38 // - permissions off, unauthenticated request, unsigned locator
39 // - permissions on, authenticated request, signed locator
40 // - permissions on, authenticated request, unsigned locator
41 // - permissions on, unauthenticated request, signed locator
42 // - permissions on, authenticated request, expired locator
44 func TestGetHandler(t *testing.T) {
47 // Prepare two test Keep volumes. Our block is stored on the second volume.
48 KeepVM = MakeTestVolumeManager(2)
51 vols := KeepVM.AllWritable()
52 if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
56 // Create locators for testing.
57 // Turn on permission settings so we can generate signed locators.
58 theConfig.RequireSignatures = true
59 theConfig.blobSigningKey = []byte(knownKey)
60 theConfig.BlobSignatureTTL.Set("5m")
63 unsignedLocator = "/" + TestHash
64 validTimestamp = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
65 expiredTimestamp = time.Now().Add(-time.Hour)
66 signedLocator = "/" + SignLocator(TestHash, knownToken, validTimestamp)
67 expiredLocator = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
71 // Test unauthenticated request with permissions off.
72 theConfig.RequireSignatures = false
74 // Unauthenticated request, unsigned locator
76 response := IssueRequest(
82 "Unauthenticated request, unsigned locator", http.StatusOK, response)
84 "Unauthenticated request, unsigned locator",
88 receivedLen := response.Header().Get("Content-Length")
89 expectedLen := fmt.Sprintf("%d", len(TestBlock))
90 if receivedLen != expectedLen {
91 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
96 theConfig.RequireSignatures = true
98 // Authenticated request, signed locator
100 response = IssueRequest(&RequestTester{
103 apiToken: knownToken,
106 "Authenticated request, signed locator", http.StatusOK, response)
108 "Authenticated request, signed locator", string(TestBlock), response)
110 receivedLen = response.Header().Get("Content-Length")
111 expectedLen = fmt.Sprintf("%d", len(TestBlock))
112 if receivedLen != expectedLen {
113 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
116 // Authenticated request, unsigned locator
117 // => PermissionError
118 response = IssueRequest(&RequestTester{
120 uri: unsignedLocator,
121 apiToken: knownToken,
123 ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response)
125 // Unauthenticated request, signed locator
126 // => PermissionError
127 response = IssueRequest(&RequestTester{
132 "Unauthenticated request, signed locator",
133 PermissionError.HTTPCode, response)
135 // Authenticated request, expired locator
137 response = IssueRequest(&RequestTester{
140 apiToken: knownToken,
143 "Authenticated request, expired locator",
144 ExpiredError.HTTPCode, response)
147 // Test PutBlockHandler on the following situations:
149 // - with server key, authenticated request, unsigned locator
150 // - with server key, unauthenticated request, unsigned locator
152 func TestPutHandler(t *testing.T) {
155 // Prepare two test Keep volumes.
156 KeepVM = MakeTestVolumeManager(2)
162 // Unauthenticated request, no server key
163 // => OK (unsigned response)
164 unsignedLocator := "/" + TestHash
165 response := IssueRequest(
168 uri: unsignedLocator,
169 requestBody: TestBlock,
173 "Unauthenticated request, no server key", http.StatusOK, response)
175 "Unauthenticated request, no server key",
176 TestHashPutResp, response)
178 // ------------------
179 // With a server key.
181 theConfig.blobSigningKey = []byte(knownKey)
182 theConfig.BlobSignatureTTL.Set("5m")
184 // When a permission key is available, the locator returned
185 // from an authenticated PUT request will be signed.
187 // Authenticated PUT, signed locator
188 // => OK (signed response)
189 response = IssueRequest(
192 uri: unsignedLocator,
193 requestBody: TestBlock,
194 apiToken: knownToken,
198 "Authenticated PUT, signed locator, with server key",
199 http.StatusOK, response)
200 responseLocator := strings.TrimSpace(response.Body.String())
201 if VerifySignature(responseLocator, knownToken) != nil {
202 t.Errorf("Authenticated PUT, signed locator, with server key:\n"+
203 "response '%s' does not contain a valid signature",
207 // Unauthenticated PUT, unsigned locator
209 response = IssueRequest(
212 uri: unsignedLocator,
213 requestBody: TestBlock,
217 "Unauthenticated PUT, unsigned locator, with server key",
218 http.StatusOK, response)
220 "Unauthenticated PUT, unsigned locator, with server key",
221 TestHashPutResp, response)
224 func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
226 theConfig.systemAuthToken = "fake-data-manager-token"
227 vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
228 vols[0].Readonly = true
229 KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
235 requestBody: TestBlock,
237 defer func(orig bool) {
238 theConfig.EnableDelete = orig
239 }(theConfig.EnableDelete)
240 theConfig.EnableDelete = true
245 requestBody: TestBlock,
246 apiToken: theConfig.systemAuthToken,
253 for _, e := range []expect{
265 if calls := vols[e.volnum].CallCount(e.method); calls != e.callcount {
266 t.Errorf("Got %d %s() on vol %d, expect %d", calls, e.method, e.volnum, e.callcount)
271 // Test /index requests:
272 // - unauthenticated /index request
273 // - unauthenticated /index/prefix request
274 // - authenticated /index request | non-superuser
275 // - authenticated /index/prefix request | non-superuser
276 // - authenticated /index request | superuser
277 // - authenticated /index/prefix request | superuser
279 // The only /index requests that should succeed are those issued by the
280 // superuser. They should pass regardless of the value of RequireSignatures.
282 func TestIndexHandler(t *testing.T) {
285 // Set up Keep volumes and populate them.
286 // Include multiple blocks on different volumes, and
287 // some metadata files (which should be omitted from index listings)
288 KeepVM = MakeTestVolumeManager(2)
291 vols := KeepVM.AllWritable()
292 vols[0].Put(context.Background(), TestHash, TestBlock)
293 vols[1].Put(context.Background(), TestHash2, TestBlock2)
294 vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
295 vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
297 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
299 unauthenticatedReq := &RequestTester{
303 authenticatedReq := &RequestTester{
306 apiToken: knownToken,
308 superuserReq := &RequestTester{
311 apiToken: theConfig.systemAuthToken,
313 unauthPrefixReq := &RequestTester{
315 uri: "/index/" + TestHash[0:3],
317 authPrefixReq := &RequestTester{
319 uri: "/index/" + TestHash[0:3],
320 apiToken: knownToken,
322 superuserPrefixReq := &RequestTester{
324 uri: "/index/" + TestHash[0:3],
325 apiToken: theConfig.systemAuthToken,
327 superuserNoSuchPrefixReq := &RequestTester{
330 apiToken: theConfig.systemAuthToken,
332 superuserInvalidPrefixReq := &RequestTester{
335 apiToken: theConfig.systemAuthToken,
338 // -------------------------------------------------------------
339 // Only the superuser should be allowed to issue /index requests.
341 // ---------------------------
342 // RequireSignatures enabled
343 // This setting should not affect tests passing.
344 theConfig.RequireSignatures = true
346 // unauthenticated /index request
347 // => UnauthorizedError
348 response := IssueRequest(unauthenticatedReq)
350 "RequireSignatures on, unauthenticated request",
351 UnauthorizedError.HTTPCode,
354 // unauthenticated /index/prefix request
355 // => UnauthorizedError
356 response = IssueRequest(unauthPrefixReq)
358 "permissions on, unauthenticated /index/prefix request",
359 UnauthorizedError.HTTPCode,
362 // authenticated /index request, non-superuser
363 // => UnauthorizedError
364 response = IssueRequest(authenticatedReq)
366 "permissions on, authenticated request, non-superuser",
367 UnauthorizedError.HTTPCode,
370 // authenticated /index/prefix request, non-superuser
371 // => UnauthorizedError
372 response = IssueRequest(authPrefixReq)
374 "permissions on, authenticated /index/prefix request, non-superuser",
375 UnauthorizedError.HTTPCode,
378 // superuser /index request
380 response = IssueRequest(superuserReq)
382 "permissions on, superuser request",
386 // ----------------------------
387 // RequireSignatures disabled
388 // Valid Request should still pass.
389 theConfig.RequireSignatures = false
391 // superuser /index request
393 response = IssueRequest(superuserReq)
395 "permissions on, superuser request",
399 expected := `^` + TestHash + `\+\d+ \d+\n` +
400 TestHash2 + `\+\d+ \d+\n\n$`
401 match, _ := regexp.MatchString(expected, response.Body.String())
404 "permissions on, superuser request: expected %s, got:\n%s",
405 expected, response.Body.String())
408 // superuser /index/prefix request
410 response = IssueRequest(superuserPrefixReq)
412 "permissions on, superuser request",
416 expected = `^` + TestHash + `\+\d+ \d+\n\n$`
417 match, _ = regexp.MatchString(expected, response.Body.String())
420 "permissions on, superuser /index/prefix request: expected %s, got:\n%s",
421 expected, response.Body.String())
424 // superuser /index/{no-such-prefix} request
426 response = IssueRequest(superuserNoSuchPrefixReq)
428 "permissions on, superuser request",
432 if "\n" != response.Body.String() {
433 t.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String())
436 // superuser /index/{invalid-prefix} request
437 // => StatusBadRequest
438 response = IssueRequest(superuserInvalidPrefixReq)
440 "permissions on, superuser request",
441 http.StatusBadRequest,
449 // With no token and with a non-data-manager token:
450 // * Delete existing block
451 // (test for 403 Forbidden, confirm block not deleted)
453 // With data manager token:
455 // * Delete existing block
456 // (test for 200 OK, response counts, confirm block deleted)
458 // * Delete nonexistent block
459 // (test for 200 OK, response counts)
463 // * Delete block on read-only and read-write volume
464 // (test for 200 OK, response with copies_deleted=1,
465 // copies_failed=1, confirm block deleted only on r/w volume)
467 // * Delete block on read-only volume only
468 // (test for 200 OK, response with copies_deleted=0, copies_failed=1,
469 // confirm block not deleted)
471 func TestDeleteHandler(t *testing.T) {
474 // Set up Keep volumes and populate them.
475 // Include multiple blocks on different volumes, and
476 // some metadata files (which should be omitted from index listings)
477 KeepVM = MakeTestVolumeManager(2)
480 vols := KeepVM.AllWritable()
481 vols[0].Put(context.Background(), TestHash, TestBlock)
483 // Explicitly set the BlobSignatureTTL to 0 for these
484 // tests, to ensure the MockVolume deletes the blocks
485 // even though they have just been created.
486 theConfig.BlobSignatureTTL = arvados.Duration(0)
488 var userToken = "NOT DATA MANAGER TOKEN"
489 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
491 theConfig.EnableDelete = true
493 unauthReq := &RequestTester{
498 userReq := &RequestTester{
504 superuserExistingBlockReq := &RequestTester{
507 apiToken: theConfig.systemAuthToken,
510 superuserNonexistentBlockReq := &RequestTester{
512 uri: "/" + TestHash2,
513 apiToken: theConfig.systemAuthToken,
516 // Unauthenticated request returns PermissionError.
517 var response *httptest.ResponseRecorder
518 response = IssueRequest(unauthReq)
520 "unauthenticated request",
521 PermissionError.HTTPCode,
524 // Authenticated non-admin request returns PermissionError.
525 response = IssueRequest(userReq)
527 "authenticated non-admin request",
528 PermissionError.HTTPCode,
531 // Authenticated admin request for nonexistent block.
532 type deletecounter struct {
533 Deleted int `json:"copies_deleted"`
534 Failed int `json:"copies_failed"`
536 var responseDc, expectedDc deletecounter
538 response = IssueRequest(superuserNonexistentBlockReq)
540 "data manager request, nonexistent block",
544 // Authenticated admin request for existing block while EnableDelete is false.
545 theConfig.EnableDelete = false
546 response = IssueRequest(superuserExistingBlockReq)
548 "authenticated request, existing block, method disabled",
549 MethodDisabledError.HTTPCode,
551 theConfig.EnableDelete = true
553 // Authenticated admin request for existing block.
554 response = IssueRequest(superuserExistingBlockReq)
556 "data manager request, existing block",
559 // Expect response {"copies_deleted":1,"copies_failed":0}
560 expectedDc = deletecounter{1, 0}
561 json.NewDecoder(response.Body).Decode(&responseDc)
562 if responseDc != expectedDc {
563 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
564 expectedDc, responseDc)
566 // Confirm the block has been deleted
567 buf := make([]byte, BlockSize)
568 _, err := vols[0].Get(context.Background(), TestHash, buf)
569 var blockDeleted = os.IsNotExist(err)
571 t.Error("superuserExistingBlockReq: block not deleted")
574 // A DELETE request on a block newer than BlobSignatureTTL
575 // should return success but leave the block on the volume.
576 vols[0].Put(context.Background(), TestHash, TestBlock)
577 theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
579 response = IssueRequest(superuserExistingBlockReq)
581 "data manager request, existing block",
584 // Expect response {"copies_deleted":1,"copies_failed":0}
585 expectedDc = deletecounter{1, 0}
586 json.NewDecoder(response.Body).Decode(&responseDc)
587 if responseDc != expectedDc {
588 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
589 expectedDc, responseDc)
591 // Confirm the block has NOT been deleted.
592 _, err = vols[0].Get(context.Background(), TestHash, buf)
594 t.Errorf("testing delete on new block: %s\n", err)
600 // Test handling of the PUT /pull statement.
602 // Cases tested: syntactically valid and invalid pull lists, from the
603 // data manager and from unprivileged users:
605 // 1. Valid pull list from an ordinary user
606 // (expected result: 401 Unauthorized)
608 // 2. Invalid pull request from an ordinary user
609 // (expected result: 401 Unauthorized)
611 // 3. Valid pull request from the data manager
612 // (expected result: 200 OK with request body "Received 3 pull
615 // 4. Invalid pull request from the data manager
616 // (expected result: 400 Bad Request)
618 // Test that in the end, the pull manager received a good pull list with
619 // the expected number of requests.
621 // TODO(twp): test concurrency: launch 100 goroutines to update the
622 // pull list simultaneously. Make sure that none of them return 400
623 // Bad Request and that pullq.GetList() returns a valid list.
625 func TestPullHandler(t *testing.T) {
628 var userToken = "USER TOKEN"
629 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
631 pullq = NewWorkQueue()
633 goodJSON := []byte(`[
635 "locator":"locator_with_two_servers",
642 "locator":"locator_with_no_servers",
647 "servers":["empty_locator"]
651 badJSON := []byte(`{ "key":"I'm a little teapot" }`)
653 type pullTest struct {
659 var testcases = []pullTest{
661 "Valid pull list from an ordinary user",
662 RequestTester{"/pull", userToken, "PUT", goodJSON},
663 http.StatusUnauthorized,
667 "Invalid pull request from an ordinary user",
668 RequestTester{"/pull", userToken, "PUT", badJSON},
669 http.StatusUnauthorized,
673 "Valid pull request from the data manager",
674 RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
676 "Received 3 pull requests\n",
679 "Invalid pull request from the data manager",
680 RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
681 http.StatusBadRequest,
686 for _, tst := range testcases {
687 response := IssueRequest(&tst.req)
688 ExpectStatusCode(t, tst.name, tst.responseCode, response)
689 ExpectBody(t, tst.name, tst.responseBody, response)
692 // The Keep pull manager should have received one good list with 3
694 for i := 0; i < 3; i++ {
695 item := <-pullq.NextItem
696 if _, ok := item.(PullRequest); !ok {
697 t.Errorf("item %v could not be parsed as a PullRequest", item)
701 expectChannelEmpty(t, pullq.NextItem)
708 // Cases tested: syntactically valid and invalid trash lists, from the
709 // data manager and from unprivileged users:
711 // 1. Valid trash list from an ordinary user
712 // (expected result: 401 Unauthorized)
714 // 2. Invalid trash list from an ordinary user
715 // (expected result: 401 Unauthorized)
717 // 3. Valid trash list from the data manager
718 // (expected result: 200 OK with request body "Received 3 trash
721 // 4. Invalid trash list from the data manager
722 // (expected result: 400 Bad Request)
724 // Test that in the end, the trash collector received a good list
725 // trash list with the expected number of requests.
727 // TODO(twp): test concurrency: launch 100 goroutines to update the
728 // pull list simultaneously. Make sure that none of them return 400
729 // Bad Request and that replica.Dump() returns a valid list.
731 func TestTrashHandler(t *testing.T) {
734 var userToken = "USER TOKEN"
735 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
737 trashq = NewWorkQueue()
739 goodJSON := []byte(`[
742 "block_mtime":1409082153
746 "block_mtime":1409082153
750 "block_mtime":1409082153
754 badJSON := []byte(`I am not a valid JSON string`)
756 type trashTest struct {
763 var testcases = []trashTest{
765 "Valid trash list from an ordinary user",
766 RequestTester{"/trash", userToken, "PUT", goodJSON},
767 http.StatusUnauthorized,
771 "Invalid trash list from an ordinary user",
772 RequestTester{"/trash", userToken, "PUT", badJSON},
773 http.StatusUnauthorized,
777 "Valid trash list from the data manager",
778 RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
780 "Received 3 trash requests\n",
783 "Invalid trash list from the data manager",
784 RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
785 http.StatusBadRequest,
790 for _, tst := range testcases {
791 response := IssueRequest(&tst.req)
792 ExpectStatusCode(t, tst.name, tst.responseCode, response)
793 ExpectBody(t, tst.name, tst.responseBody, response)
796 // The trash collector should have received one good list with 3
798 for i := 0; i < 3; i++ {
799 item := <-trashq.NextItem
800 if _, ok := item.(TrashRequest); !ok {
801 t.Errorf("item %v could not be parsed as a TrashRequest", item)
805 expectChannelEmpty(t, trashq.NextItem)
808 // ====================
810 // ====================
812 // IssueTestRequest executes an HTTP request described by rt, to a
813 // REST router. It returns the HTTP response to the request.
814 func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
815 response := httptest.NewRecorder()
816 body := bytes.NewReader(rt.requestBody)
817 req, _ := http.NewRequest(rt.method, rt.uri, body)
818 if rt.apiToken != "" {
819 req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
821 loggingRouter := MakeRESTRouter()
822 loggingRouter.ServeHTTP(response, req)
826 // ExpectStatusCode checks whether a response has the specified status code,
827 // and reports a test failure if not.
828 func ExpectStatusCode(
832 response *httptest.ResponseRecorder) {
833 if response.Code != expectedStatus {
834 t.Errorf("%s: expected status %d, got %+v",
835 testname, expectedStatus, response)
843 response *httptest.ResponseRecorder) {
844 if expectedBody != "" && response.Body.String() != expectedBody {
845 t.Errorf("%s: expected response body '%s', got %+v",
846 testname, expectedBody, response)
851 func TestPutNeedsOnlyOneBuffer(t *testing.T) {
853 KeepVM = MakeTestVolumeManager(1)
856 defer func(orig *bufferPool) {
859 bufs = newBufferPool(1, BlockSize)
861 ok := make(chan struct{})
863 for i := 0; i < 2; i++ {
864 response := IssueRequest(
868 requestBody: TestBlock,
871 "TestPutNeedsOnlyOneBuffer", http.StatusOK, response)
878 case <-time.After(time.Second):
879 t.Fatal("PUT deadlocks with MaxBuffers==1")
883 // Invoke the PutBlockHandler a bunch of times to test for bufferpool resource
885 func TestPutHandlerNoBufferleak(t *testing.T) {
888 // Prepare two test Keep volumes.
889 KeepVM = MakeTestVolumeManager(2)
892 ok := make(chan bool)
894 for i := 0; i < theConfig.MaxBuffers+1; i++ {
895 // Unauthenticated request, no server key
896 // => OK (unsigned response)
897 unsignedLocator := "/" + TestHash
898 response := IssueRequest(
901 uri: unsignedLocator,
902 requestBody: TestBlock,
905 "TestPutHandlerBufferleak", http.StatusOK, response)
907 "TestPutHandlerBufferleak",
908 TestHashPutResp, response)
913 case <-time.After(20 * time.Second):
914 // If the buffer pool leaks, the test goroutine hangs.
915 t.Fatal("test did not finish, assuming pool leaked")
920 type notifyingResponseRecorder struct {
921 *httptest.ResponseRecorder
925 func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
929 func TestGetHandlerClientDisconnect(t *testing.T) {
930 defer func(was bool) {
931 theConfig.RequireSignatures = was
932 }(theConfig.RequireSignatures)
933 theConfig.RequireSignatures = false
935 defer func(orig *bufferPool) {
938 bufs = newBufferPool(1, BlockSize)
939 defer bufs.Put(bufs.Get(BlockSize))
941 KeepVM = MakeTestVolumeManager(2)
944 if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
948 resp := ¬ifyingResponseRecorder{
949 ResponseRecorder: httptest.NewRecorder(),
950 closer: make(chan bool, 1),
952 if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
953 t.Fatal("notifyingResponseRecorder is broken")
955 // If anyone asks, the client has disconnected.
958 ok := make(chan struct{})
960 req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
961 (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
966 case <-time.After(20 * time.Second):
967 t.Fatal("request took >20s, close notifier must be broken")
971 ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
972 for i, v := range KeepVM.AllWritable() {
973 if calls := v.(*MockVolume).called["GET"]; calls != 0 {
974 t.Errorf("volume %d got %d calls, expected 0", i, calls)
979 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
981 func TestGetHandlerNoBufferLeak(t *testing.T) {
984 // Prepare two test Keep volumes. Our block is stored on the second volume.
985 KeepVM = MakeTestVolumeManager(2)
988 vols := KeepVM.AllWritable()
989 if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
993 ok := make(chan bool)
995 for i := 0; i < theConfig.MaxBuffers+1; i++ {
996 // Unauthenticated request, unsigned locator
998 unsignedLocator := "/" + TestHash
999 response := IssueRequest(
1002 uri: unsignedLocator,
1005 "Unauthenticated request, unsigned locator", http.StatusOK, response)
1007 "Unauthenticated request, unsigned locator",
1014 case <-time.After(20 * time.Second):
1015 // If the buffer pool leaks, the test goroutine hangs.
1016 t.Fatal("test did not finish, assuming pool leaked")
1021 func TestPutReplicationHeader(t *testing.T) {
1024 KeepVM = MakeTestVolumeManager(2)
1025 defer KeepVM.Close()
1027 resp := IssueRequest(&RequestTester{
1029 uri: "/" + TestHash,
1030 requestBody: TestBlock,
1032 if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
1033 t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
1037 func TestUntrashHandler(t *testing.T) {
1040 // Set up Keep volumes
1041 KeepVM = MakeTestVolumeManager(2)
1042 defer KeepVM.Close()
1043 vols := KeepVM.AllWritable()
1044 vols[0].Put(context.Background(), TestHash, TestBlock)
1046 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1048 // unauthenticatedReq => UnauthorizedError
1049 unauthenticatedReq := &RequestTester{
1051 uri: "/untrash/" + TestHash,
1053 response := IssueRequest(unauthenticatedReq)
1055 "Unauthenticated request",
1056 UnauthorizedError.HTTPCode,
1059 // notDataManagerReq => UnauthorizedError
1060 notDataManagerReq := &RequestTester{
1062 uri: "/untrash/" + TestHash,
1063 apiToken: knownToken,
1066 response = IssueRequest(notDataManagerReq)
1068 "Non-datamanager token",
1069 UnauthorizedError.HTTPCode,
1072 // datamanagerWithBadHashReq => StatusBadRequest
1073 datamanagerWithBadHashReq := &RequestTester{
1075 uri: "/untrash/thisisnotalocator",
1076 apiToken: theConfig.systemAuthToken,
1078 response = IssueRequest(datamanagerWithBadHashReq)
1080 "Bad locator in untrash request",
1081 http.StatusBadRequest,
1084 // datamanagerWrongMethodReq => StatusBadRequest
1085 datamanagerWrongMethodReq := &RequestTester{
1087 uri: "/untrash/" + TestHash,
1088 apiToken: theConfig.systemAuthToken,
1090 response = IssueRequest(datamanagerWrongMethodReq)
1092 "Only PUT method is supported for untrash",
1093 http.StatusBadRequest,
1096 // datamanagerReq => StatusOK
1097 datamanagerReq := &RequestTester{
1099 uri: "/untrash/" + TestHash,
1100 apiToken: theConfig.systemAuthToken,
1102 response = IssueRequest(datamanagerReq)
1107 expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
1108 if response.Body.String() != expected {
1110 "Untrash response mismatched: expected %s, got:\n%s",
1111 expected, response.Body.String())
1115 func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
1118 // Set up readonly Keep volumes
1119 vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
1120 vols[0].Readonly = true
1121 vols[1].Readonly = true
1122 KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
1123 defer KeepVM.Close()
1125 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1127 // datamanagerReq => StatusOK
1128 datamanagerReq := &RequestTester{
1130 uri: "/untrash/" + TestHash,
1131 apiToken: theConfig.systemAuthToken,
1133 response := IssueRequest(datamanagerReq)
1135 "No writable volumes",
1136 http.StatusNotFound,