1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Tests for Keep HTTP handlers:
11 // The HTTP handlers are responsible for enforcing permission policy,
12 // so these tests must exercise all possible permission permutations.
29 "git.curoverse.com/arvados.git/sdk/go/arvados"
30 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
33 // A RequestTester represents the parameters for an HTTP request to
34 // be issued on behalf of a unit test.
35 type RequestTester struct {
42 // Test GetBlockHandler on the following situations:
43 // - permissions off, unauthenticated request, unsigned locator
44 // - permissions on, authenticated request, signed locator
45 // - permissions on, authenticated request, unsigned locator
46 // - permissions on, unauthenticated request, signed locator
47 // - permissions on, authenticated request, expired locator
49 func TestGetHandler(t *testing.T) {
52 // Prepare two test Keep volumes. Our block is stored on the second volume.
53 KeepVM = MakeTestVolumeManager(2)
56 vols := KeepVM.AllWritable()
57 if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
61 // Create locators for testing.
62 // Turn on permission settings so we can generate signed locators.
63 theConfig.RequireSignatures = true
64 theConfig.blobSigningKey = []byte(knownKey)
65 theConfig.BlobSignatureTTL.Set("5m")
68 unsignedLocator = "/" + TestHash
69 validTimestamp = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
70 expiredTimestamp = time.Now().Add(-time.Hour)
71 signedLocator = "/" + SignLocator(TestHash, knownToken, validTimestamp)
72 expiredLocator = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
76 // Test unauthenticated request with permissions off.
77 theConfig.RequireSignatures = false
79 // Unauthenticated request, unsigned locator
81 response := IssueRequest(
87 "Unauthenticated request, unsigned locator", http.StatusOK, response)
89 "Unauthenticated request, unsigned locator",
93 receivedLen := response.Header().Get("Content-Length")
94 expectedLen := fmt.Sprintf("%d", len(TestBlock))
95 if receivedLen != expectedLen {
96 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
101 theConfig.RequireSignatures = true
103 // Authenticated request, signed locator
105 response = IssueRequest(&RequestTester{
108 apiToken: knownToken,
111 "Authenticated request, signed locator", http.StatusOK, response)
113 "Authenticated request, signed locator", string(TestBlock), response)
115 receivedLen = response.Header().Get("Content-Length")
116 expectedLen = fmt.Sprintf("%d", len(TestBlock))
117 if receivedLen != expectedLen {
118 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
121 // Authenticated request, unsigned locator
122 // => PermissionError
123 response = IssueRequest(&RequestTester{
125 uri: unsignedLocator,
126 apiToken: knownToken,
128 ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response)
130 // Unauthenticated request, signed locator
131 // => PermissionError
132 response = IssueRequest(&RequestTester{
137 "Unauthenticated request, signed locator",
138 PermissionError.HTTPCode, response)
140 // Authenticated request, expired locator
142 response = IssueRequest(&RequestTester{
145 apiToken: knownToken,
148 "Authenticated request, expired locator",
149 ExpiredError.HTTPCode, response)
152 // Test PutBlockHandler on the following situations:
154 // - with server key, authenticated request, unsigned locator
155 // - with server key, unauthenticated request, unsigned locator
157 func TestPutHandler(t *testing.T) {
160 // Prepare two test Keep volumes.
161 KeepVM = MakeTestVolumeManager(2)
167 // Unauthenticated request, no server key
168 // => OK (unsigned response)
169 unsignedLocator := "/" + TestHash
170 response := IssueRequest(
173 uri: unsignedLocator,
174 requestBody: TestBlock,
178 "Unauthenticated request, no server key", http.StatusOK, response)
180 "Unauthenticated request, no server key",
181 TestHashPutResp, response)
183 // ------------------
184 // With a server key.
186 theConfig.blobSigningKey = []byte(knownKey)
187 theConfig.BlobSignatureTTL.Set("5m")
189 // When a permission key is available, the locator returned
190 // from an authenticated PUT request will be signed.
192 // Authenticated PUT, signed locator
193 // => OK (signed response)
194 response = IssueRequest(
197 uri: unsignedLocator,
198 requestBody: TestBlock,
199 apiToken: knownToken,
203 "Authenticated PUT, signed locator, with server key",
204 http.StatusOK, response)
205 responseLocator := strings.TrimSpace(response.Body.String())
206 if VerifySignature(responseLocator, knownToken) != nil {
207 t.Errorf("Authenticated PUT, signed locator, with server key:\n"+
208 "response '%s' does not contain a valid signature",
212 // Unauthenticated PUT, unsigned locator
214 response = IssueRequest(
217 uri: unsignedLocator,
218 requestBody: TestBlock,
222 "Unauthenticated PUT, unsigned locator, with server key",
223 http.StatusOK, response)
225 "Unauthenticated PUT, unsigned locator, with server key",
226 TestHashPutResp, response)
229 func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
231 theConfig.systemAuthToken = "fake-data-manager-token"
232 vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
233 vols[0].Readonly = true
234 KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
240 requestBody: TestBlock,
242 defer func(orig bool) {
243 theConfig.EnableDelete = orig
244 }(theConfig.EnableDelete)
245 theConfig.EnableDelete = true
250 requestBody: TestBlock,
251 apiToken: theConfig.systemAuthToken,
258 for _, e := range []expect{
270 if calls := vols[e.volnum].CallCount(e.method); calls != e.callcount {
271 t.Errorf("Got %d %s() on vol %d, expect %d", calls, e.method, e.volnum, e.callcount)
276 // Test /index requests:
277 // - unauthenticated /index request
278 // - unauthenticated /index/prefix request
279 // - authenticated /index request | non-superuser
280 // - authenticated /index/prefix request | non-superuser
281 // - authenticated /index request | superuser
282 // - authenticated /index/prefix request | superuser
284 // The only /index requests that should succeed are those issued by the
285 // superuser. They should pass regardless of the value of RequireSignatures.
287 func TestIndexHandler(t *testing.T) {
290 // Set up Keep volumes and populate them.
291 // Include multiple blocks on different volumes, and
292 // some metadata files (which should be omitted from index listings)
293 KeepVM = MakeTestVolumeManager(2)
296 vols := KeepVM.AllWritable()
297 vols[0].Put(context.Background(), TestHash, TestBlock)
298 vols[1].Put(context.Background(), TestHash2, TestBlock2)
299 vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
300 vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
302 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
304 unauthenticatedReq := &RequestTester{
308 authenticatedReq := &RequestTester{
311 apiToken: knownToken,
313 superuserReq := &RequestTester{
316 apiToken: theConfig.systemAuthToken,
318 unauthPrefixReq := &RequestTester{
320 uri: "/index/" + TestHash[0:3],
322 authPrefixReq := &RequestTester{
324 uri: "/index/" + TestHash[0:3],
325 apiToken: knownToken,
327 superuserPrefixReq := &RequestTester{
329 uri: "/index/" + TestHash[0:3],
330 apiToken: theConfig.systemAuthToken,
332 superuserNoSuchPrefixReq := &RequestTester{
335 apiToken: theConfig.systemAuthToken,
337 superuserInvalidPrefixReq := &RequestTester{
340 apiToken: theConfig.systemAuthToken,
343 // -------------------------------------------------------------
344 // Only the superuser should be allowed to issue /index requests.
346 // ---------------------------
347 // RequireSignatures enabled
348 // This setting should not affect tests passing.
349 theConfig.RequireSignatures = true
351 // unauthenticated /index request
352 // => UnauthorizedError
353 response := IssueRequest(unauthenticatedReq)
355 "RequireSignatures on, unauthenticated request",
356 UnauthorizedError.HTTPCode,
359 // unauthenticated /index/prefix request
360 // => UnauthorizedError
361 response = IssueRequest(unauthPrefixReq)
363 "permissions on, unauthenticated /index/prefix request",
364 UnauthorizedError.HTTPCode,
367 // authenticated /index request, non-superuser
368 // => UnauthorizedError
369 response = IssueRequest(authenticatedReq)
371 "permissions on, authenticated request, non-superuser",
372 UnauthorizedError.HTTPCode,
375 // authenticated /index/prefix request, non-superuser
376 // => UnauthorizedError
377 response = IssueRequest(authPrefixReq)
379 "permissions on, authenticated /index/prefix request, non-superuser",
380 UnauthorizedError.HTTPCode,
383 // superuser /index request
385 response = IssueRequest(superuserReq)
387 "permissions on, superuser request",
391 // ----------------------------
392 // RequireSignatures disabled
393 // Valid Request should still pass.
394 theConfig.RequireSignatures = false
396 // superuser /index request
398 response = IssueRequest(superuserReq)
400 "permissions on, superuser request",
404 expected := `^` + TestHash + `\+\d+ \d+\n` +
405 TestHash2 + `\+\d+ \d+\n\n$`
406 match, _ := regexp.MatchString(expected, response.Body.String())
409 "permissions on, superuser request: expected %s, got:\n%s",
410 expected, response.Body.String())
413 // superuser /index/prefix request
415 response = IssueRequest(superuserPrefixReq)
417 "permissions on, superuser request",
421 expected = `^` + TestHash + `\+\d+ \d+\n\n$`
422 match, _ = regexp.MatchString(expected, response.Body.String())
425 "permissions on, superuser /index/prefix request: expected %s, got:\n%s",
426 expected, response.Body.String())
429 // superuser /index/{no-such-prefix} request
431 response = IssueRequest(superuserNoSuchPrefixReq)
433 "permissions on, superuser request",
437 if "\n" != response.Body.String() {
438 t.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String())
441 // superuser /index/{invalid-prefix} request
442 // => StatusBadRequest
443 response = IssueRequest(superuserInvalidPrefixReq)
445 "permissions on, superuser request",
446 http.StatusBadRequest,
454 // With no token and with a non-data-manager token:
455 // * Delete existing block
456 // (test for 403 Forbidden, confirm block not deleted)
458 // With data manager token:
460 // * Delete existing block
461 // (test for 200 OK, response counts, confirm block deleted)
463 // * Delete nonexistent block
464 // (test for 200 OK, response counts)
468 // * Delete block on read-only and read-write volume
469 // (test for 200 OK, response with copies_deleted=1,
470 // copies_failed=1, confirm block deleted only on r/w volume)
472 // * Delete block on read-only volume only
473 // (test for 200 OK, response with copies_deleted=0, copies_failed=1,
474 // confirm block not deleted)
476 func TestDeleteHandler(t *testing.T) {
479 // Set up Keep volumes and populate them.
480 // Include multiple blocks on different volumes, and
481 // some metadata files (which should be omitted from index listings)
482 KeepVM = MakeTestVolumeManager(2)
485 vols := KeepVM.AllWritable()
486 vols[0].Put(context.Background(), TestHash, TestBlock)
488 // Explicitly set the BlobSignatureTTL to 0 for these
489 // tests, to ensure the MockVolume deletes the blocks
490 // even though they have just been created.
491 theConfig.BlobSignatureTTL = arvados.Duration(0)
493 var userToken = "NOT DATA MANAGER TOKEN"
494 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
496 theConfig.EnableDelete = true
498 unauthReq := &RequestTester{
503 userReq := &RequestTester{
509 superuserExistingBlockReq := &RequestTester{
512 apiToken: theConfig.systemAuthToken,
515 superuserNonexistentBlockReq := &RequestTester{
517 uri: "/" + TestHash2,
518 apiToken: theConfig.systemAuthToken,
521 // Unauthenticated request returns PermissionError.
522 var response *httptest.ResponseRecorder
523 response = IssueRequest(unauthReq)
525 "unauthenticated request",
526 PermissionError.HTTPCode,
529 // Authenticated non-admin request returns PermissionError.
530 response = IssueRequest(userReq)
532 "authenticated non-admin request",
533 PermissionError.HTTPCode,
536 // Authenticated admin request for nonexistent block.
537 type deletecounter struct {
538 Deleted int `json:"copies_deleted"`
539 Failed int `json:"copies_failed"`
541 var responseDc, expectedDc deletecounter
543 response = IssueRequest(superuserNonexistentBlockReq)
545 "data manager request, nonexistent block",
549 // Authenticated admin request for existing block while EnableDelete is false.
550 theConfig.EnableDelete = false
551 response = IssueRequest(superuserExistingBlockReq)
553 "authenticated request, existing block, method disabled",
554 MethodDisabledError.HTTPCode,
556 theConfig.EnableDelete = true
558 // Authenticated admin request for existing block.
559 response = IssueRequest(superuserExistingBlockReq)
561 "data manager request, existing block",
564 // Expect response {"copies_deleted":1,"copies_failed":0}
565 expectedDc = deletecounter{1, 0}
566 json.NewDecoder(response.Body).Decode(&responseDc)
567 if responseDc != expectedDc {
568 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
569 expectedDc, responseDc)
571 // Confirm the block has been deleted
572 buf := make([]byte, BlockSize)
573 _, err := vols[0].Get(context.Background(), TestHash, buf)
574 var blockDeleted = os.IsNotExist(err)
576 t.Error("superuserExistingBlockReq: block not deleted")
579 // A DELETE request on a block newer than BlobSignatureTTL
580 // should return success but leave the block on the volume.
581 vols[0].Put(context.Background(), TestHash, TestBlock)
582 theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
584 response = IssueRequest(superuserExistingBlockReq)
586 "data manager request, existing block",
589 // Expect response {"copies_deleted":1,"copies_failed":0}
590 expectedDc = deletecounter{1, 0}
591 json.NewDecoder(response.Body).Decode(&responseDc)
592 if responseDc != expectedDc {
593 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
594 expectedDc, responseDc)
596 // Confirm the block has NOT been deleted.
597 _, err = vols[0].Get(context.Background(), TestHash, buf)
599 t.Errorf("testing delete on new block: %s\n", err)
605 // Test handling of the PUT /pull statement.
607 // Cases tested: syntactically valid and invalid pull lists, from the
608 // data manager and from unprivileged users:
610 // 1. Valid pull list from an ordinary user
611 // (expected result: 401 Unauthorized)
613 // 2. Invalid pull request from an ordinary user
614 // (expected result: 401 Unauthorized)
616 // 3. Valid pull request from the data manager
617 // (expected result: 200 OK with request body "Received 3 pull
620 // 4. Invalid pull request from the data manager
621 // (expected result: 400 Bad Request)
623 // Test that in the end, the pull manager received a good pull list with
624 // the expected number of requests.
626 // TODO(twp): test concurrency: launch 100 goroutines to update the
627 // pull list simultaneously. Make sure that none of them return 400
628 // Bad Request and that pullq.GetList() returns a valid list.
630 func TestPullHandler(t *testing.T) {
633 var userToken = "USER TOKEN"
634 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
636 pullq = NewWorkQueue()
638 goodJSON := []byte(`[
640 "locator":"locator_with_two_servers",
647 "locator":"locator_with_no_servers",
652 "servers":["empty_locator"]
656 badJSON := []byte(`{ "key":"I'm a little teapot" }`)
658 type pullTest struct {
664 var testcases = []pullTest{
666 "Valid pull list from an ordinary user",
667 RequestTester{"/pull", userToken, "PUT", goodJSON},
668 http.StatusUnauthorized,
672 "Invalid pull request from an ordinary user",
673 RequestTester{"/pull", userToken, "PUT", badJSON},
674 http.StatusUnauthorized,
678 "Valid pull request from the data manager",
679 RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
681 "Received 3 pull requests\n",
684 "Invalid pull request from the data manager",
685 RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
686 http.StatusBadRequest,
691 for _, tst := range testcases {
692 response := IssueRequest(&tst.req)
693 ExpectStatusCode(t, tst.name, tst.responseCode, response)
694 ExpectBody(t, tst.name, tst.responseBody, response)
697 // The Keep pull manager should have received one good list with 3
699 for i := 0; i < 3; i++ {
700 item := <-pullq.NextItem
701 if _, ok := item.(PullRequest); !ok {
702 t.Errorf("item %v could not be parsed as a PullRequest", item)
706 expectChannelEmpty(t, pullq.NextItem)
713 // Cases tested: syntactically valid and invalid trash lists, from the
714 // data manager and from unprivileged users:
716 // 1. Valid trash list from an ordinary user
717 // (expected result: 401 Unauthorized)
719 // 2. Invalid trash list from an ordinary user
720 // (expected result: 401 Unauthorized)
722 // 3. Valid trash list from the data manager
723 // (expected result: 200 OK with request body "Received 3 trash
726 // 4. Invalid trash list from the data manager
727 // (expected result: 400 Bad Request)
729 // Test that in the end, the trash collector received a good list
730 // trash list with the expected number of requests.
732 // TODO(twp): test concurrency: launch 100 goroutines to update the
733 // pull list simultaneously. Make sure that none of them return 400
734 // Bad Request and that replica.Dump() returns a valid list.
736 func TestTrashHandler(t *testing.T) {
739 var userToken = "USER TOKEN"
740 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
742 trashq = NewWorkQueue()
744 goodJSON := []byte(`[
747 "block_mtime":1409082153
751 "block_mtime":1409082153
755 "block_mtime":1409082153
759 badJSON := []byte(`I am not a valid JSON string`)
761 type trashTest struct {
768 var testcases = []trashTest{
770 "Valid trash list from an ordinary user",
771 RequestTester{"/trash", userToken, "PUT", goodJSON},
772 http.StatusUnauthorized,
776 "Invalid trash list from an ordinary user",
777 RequestTester{"/trash", userToken, "PUT", badJSON},
778 http.StatusUnauthorized,
782 "Valid trash list from the data manager",
783 RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
785 "Received 3 trash requests\n",
788 "Invalid trash list from the data manager",
789 RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
790 http.StatusBadRequest,
795 for _, tst := range testcases {
796 response := IssueRequest(&tst.req)
797 ExpectStatusCode(t, tst.name, tst.responseCode, response)
798 ExpectBody(t, tst.name, tst.responseBody, response)
801 // The trash collector should have received one good list with 3
803 for i := 0; i < 3; i++ {
804 item := <-trashq.NextItem
805 if _, ok := item.(TrashRequest); !ok {
806 t.Errorf("item %v could not be parsed as a TrashRequest", item)
810 expectChannelEmpty(t, trashq.NextItem)
813 // ====================
815 // ====================
817 // IssueTestRequest executes an HTTP request described by rt, to a
818 // REST router. It returns the HTTP response to the request.
819 func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
820 response := httptest.NewRecorder()
821 body := bytes.NewReader(rt.requestBody)
822 req, _ := http.NewRequest(rt.method, rt.uri, body)
823 if rt.apiToken != "" {
824 req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
826 loggingRouter := MakeRESTRouter()
827 loggingRouter.ServeHTTP(response, req)
831 func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
832 response := httptest.NewRecorder()
833 body := bytes.NewReader(rt.requestBody)
834 req, _ := http.NewRequest(rt.method, rt.uri, body)
835 if rt.apiToken != "" {
836 req.Header.Set("Authorization", "Bearer "+rt.apiToken)
838 loggingRouter := MakeRESTRouter()
839 loggingRouter.ServeHTTP(response, req)
843 // ExpectStatusCode checks whether a response has the specified status code,
844 // and reports a test failure if not.
845 func ExpectStatusCode(
849 response *httptest.ResponseRecorder) {
850 if response.Code != expectedStatus {
851 t.Errorf("%s: expected status %d, got %+v",
852 testname, expectedStatus, response)
860 response *httptest.ResponseRecorder) {
861 if expectedBody != "" && response.Body.String() != expectedBody {
862 t.Errorf("%s: expected response body '%s', got %+v",
863 testname, expectedBody, response)
868 func TestPutNeedsOnlyOneBuffer(t *testing.T) {
870 KeepVM = MakeTestVolumeManager(1)
873 defer func(orig *bufferPool) {
876 bufs = newBufferPool(1, BlockSize)
878 ok := make(chan struct{})
880 for i := 0; i < 2; i++ {
881 response := IssueRequest(
885 requestBody: TestBlock,
888 "TestPutNeedsOnlyOneBuffer", http.StatusOK, response)
895 case <-time.After(time.Second):
896 t.Fatal("PUT deadlocks with MaxBuffers==1")
900 // Invoke the PutBlockHandler a bunch of times to test for bufferpool resource
902 func TestPutHandlerNoBufferleak(t *testing.T) {
905 // Prepare two test Keep volumes.
906 KeepVM = MakeTestVolumeManager(2)
909 ok := make(chan bool)
911 for i := 0; i < theConfig.MaxBuffers+1; i++ {
912 // Unauthenticated request, no server key
913 // => OK (unsigned response)
914 unsignedLocator := "/" + TestHash
915 response := IssueRequest(
918 uri: unsignedLocator,
919 requestBody: TestBlock,
922 "TestPutHandlerBufferleak", http.StatusOK, response)
924 "TestPutHandlerBufferleak",
925 TestHashPutResp, response)
930 case <-time.After(20 * time.Second):
931 // If the buffer pool leaks, the test goroutine hangs.
932 t.Fatal("test did not finish, assuming pool leaked")
937 type notifyingResponseRecorder struct {
938 *httptest.ResponseRecorder
942 func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
946 func TestGetHandlerClientDisconnect(t *testing.T) {
947 defer func(was bool) {
948 theConfig.RequireSignatures = was
949 }(theConfig.RequireSignatures)
950 theConfig.RequireSignatures = false
952 defer func(orig *bufferPool) {
955 bufs = newBufferPool(1, BlockSize)
956 defer bufs.Put(bufs.Get(BlockSize))
958 KeepVM = MakeTestVolumeManager(2)
961 if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
965 resp := ¬ifyingResponseRecorder{
966 ResponseRecorder: httptest.NewRecorder(),
967 closer: make(chan bool, 1),
969 if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
970 t.Fatal("notifyingResponseRecorder is broken")
972 // If anyone asks, the client has disconnected.
975 ok := make(chan struct{})
977 req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
978 (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
983 case <-time.After(20 * time.Second):
984 t.Fatal("request took >20s, close notifier must be broken")
988 ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
989 for i, v := range KeepVM.AllWritable() {
990 if calls := v.(*MockVolume).called["GET"]; calls != 0 {
991 t.Errorf("volume %d got %d calls, expected 0", i, calls)
996 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
998 func TestGetHandlerNoBufferLeak(t *testing.T) {
1001 // Prepare two test Keep volumes. Our block is stored on the second volume.
1002 KeepVM = MakeTestVolumeManager(2)
1003 defer KeepVM.Close()
1005 vols := KeepVM.AllWritable()
1006 if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
1010 ok := make(chan bool)
1012 for i := 0; i < theConfig.MaxBuffers+1; i++ {
1013 // Unauthenticated request, unsigned locator
1015 unsignedLocator := "/" + TestHash
1016 response := IssueRequest(
1019 uri: unsignedLocator,
1022 "Unauthenticated request, unsigned locator", http.StatusOK, response)
1024 "Unauthenticated request, unsigned locator",
1031 case <-time.After(20 * time.Second):
1032 // If the buffer pool leaks, the test goroutine hangs.
1033 t.Fatal("test did not finish, assuming pool leaked")
1038 func TestPutReplicationHeader(t *testing.T) {
1041 KeepVM = MakeTestVolumeManager(2)
1042 defer KeepVM.Close()
1044 resp := IssueRequest(&RequestTester{
1046 uri: "/" + TestHash,
1047 requestBody: TestBlock,
1049 if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
1050 t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
1054 func TestUntrashHandler(t *testing.T) {
1057 // Set up Keep volumes
1058 KeepVM = MakeTestVolumeManager(2)
1059 defer KeepVM.Close()
1060 vols := KeepVM.AllWritable()
1061 vols[0].Put(context.Background(), TestHash, TestBlock)
1063 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1065 // unauthenticatedReq => UnauthorizedError
1066 unauthenticatedReq := &RequestTester{
1068 uri: "/untrash/" + TestHash,
1070 response := IssueRequest(unauthenticatedReq)
1072 "Unauthenticated request",
1073 UnauthorizedError.HTTPCode,
1076 // notDataManagerReq => UnauthorizedError
1077 notDataManagerReq := &RequestTester{
1079 uri: "/untrash/" + TestHash,
1080 apiToken: knownToken,
1083 response = IssueRequest(notDataManagerReq)
1085 "Non-datamanager token",
1086 UnauthorizedError.HTTPCode,
1089 // datamanagerWithBadHashReq => StatusBadRequest
1090 datamanagerWithBadHashReq := &RequestTester{
1092 uri: "/untrash/thisisnotalocator",
1093 apiToken: theConfig.systemAuthToken,
1095 response = IssueRequest(datamanagerWithBadHashReq)
1097 "Bad locator in untrash request",
1098 http.StatusBadRequest,
1101 // datamanagerWrongMethodReq => StatusBadRequest
1102 datamanagerWrongMethodReq := &RequestTester{
1104 uri: "/untrash/" + TestHash,
1105 apiToken: theConfig.systemAuthToken,
1107 response = IssueRequest(datamanagerWrongMethodReq)
1109 "Only PUT method is supported for untrash",
1110 http.StatusBadRequest,
1113 // datamanagerReq => StatusOK
1114 datamanagerReq := &RequestTester{
1116 uri: "/untrash/" + TestHash,
1117 apiToken: theConfig.systemAuthToken,
1119 response = IssueRequest(datamanagerReq)
1124 expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
1125 if response.Body.String() != expected {
1127 "Untrash response mismatched: expected %s, got:\n%s",
1128 expected, response.Body.String())
1132 func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
1135 // Set up readonly Keep volumes
1136 vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
1137 vols[0].Readonly = true
1138 vols[1].Readonly = true
1139 KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
1140 defer KeepVM.Close()
1142 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1144 // datamanagerReq => StatusOK
1145 datamanagerReq := &RequestTester{
1147 uri: "/untrash/" + TestHash,
1148 apiToken: theConfig.systemAuthToken,
1150 response := IssueRequest(datamanagerReq)
1152 "No writable volumes",
1153 http.StatusNotFound,
1157 func TestHealthCheckPing(t *testing.T) {
1158 theConfig.ManagementToken = arvadostest.ManagementToken
1159 pingReq := &RequestTester{
1161 uri: "/_health/ping",
1162 apiToken: arvadostest.ManagementToken,
1164 response := IssueHealthCheckRequest(pingReq)
1169 want := `{"health":"OK"}`
1170 if !strings.Contains(response.Body.String(), want) {
1171 t.Errorf("expected response to include %s: got %s", want, response.Body.String())