X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d44a5c508cfa664134daad806d7be9a7cb0bd6ee..9fbb2bbd4e2a0f1d15e1db3f3d606cdedae825a7:/services/keepstore/handler_test.go diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go index 0829ce9f92..c181982a13 100644 --- a/services/keepstore/handler_test.go +++ b/services/keepstore/handler_test.go @@ -13,7 +13,6 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/gorilla/mux" "net/http" "net/http/httptest" "os" @@ -44,25 +43,22 @@ func TestGetHandler(t *testing.T) { // Prepare two test Keep volumes. Our block is stored on the second volume. KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Quit() + defer KeepVM.Close() - vols := KeepVM.Volumes() + vols := KeepVM.AllWritable() if err := vols[0].Put(TEST_HASH, TEST_BLOCK); err != nil { t.Error(err) } - // Set up a REST router for testing the handlers. - rest := MakeRESTRouter() - // Create locators for testing. // Turn on permission settings so we can generate signed locators. enforce_permissions = true PermissionSecret = []byte(known_key) - permission_ttl = time.Duration(300) * time.Second + blob_signature_ttl = 300 * time.Second var ( unsigned_locator = "/" + TEST_HASH - valid_timestamp = time.Now().Add(permission_ttl) + valid_timestamp = time.Now().Add(blob_signature_ttl) expired_timestamp = time.Now().Add(-time.Hour) signed_locator = "/" + SignLocator(TEST_HASH, known_token, valid_timestamp) expired_locator = "/" + SignLocator(TEST_HASH, known_token, expired_timestamp) @@ -74,7 +70,7 @@ func TestGetHandler(t *testing.T) { // Unauthenticated request, unsigned locator // => OK - response := IssueRequest(rest, + response := IssueRequest( &RequestTester{ method: "GET", uri: unsigned_locator, @@ -85,10 +81,11 @@ func TestGetHandler(t *testing.T) { "Unauthenticated request, unsigned locator", string(TEST_BLOCK), response) - received_xbs := response.Header().Get("X-Block-Size") - expected_xbs := fmt.Sprintf("%d", len(TEST_BLOCK)) - if received_xbs != expected_xbs { - t.Errorf("expected X-Block-Size %s, got %s", expected_xbs, received_xbs) + + received_cl := response.Header().Get("Content-Length") + expected_cl := fmt.Sprintf("%d", len(TEST_BLOCK)) + if received_cl != expected_cl { + t.Errorf("expected Content-Length %s, got %s", expected_cl, received_cl) } // ---------------- @@ -97,7 +94,7 @@ func TestGetHandler(t *testing.T) { // Authenticated request, signed locator // => OK - response = IssueRequest(rest, &RequestTester{ + response = IssueRequest(&RequestTester{ method: "GET", uri: signed_locator, api_token: known_token, @@ -106,15 +103,16 @@ func TestGetHandler(t *testing.T) { "Authenticated request, signed locator", http.StatusOK, response) ExpectBody(t, "Authenticated request, signed locator", string(TEST_BLOCK), response) - received_xbs = response.Header().Get("X-Block-Size") - expected_xbs = fmt.Sprintf("%d", len(TEST_BLOCK)) - if received_xbs != expected_xbs { - t.Errorf("expected X-Block-Size %s, got %s", expected_xbs, received_xbs) + + received_cl = response.Header().Get("Content-Length") + expected_cl = fmt.Sprintf("%d", len(TEST_BLOCK)) + if received_cl != expected_cl { + t.Errorf("expected Content-Length %s, got %s", expected_cl, received_cl) } // Authenticated request, unsigned locator // => PermissionError - response = IssueRequest(rest, &RequestTester{ + response = IssueRequest(&RequestTester{ method: "GET", uri: unsigned_locator, api_token: known_token, @@ -123,7 +121,7 @@ func TestGetHandler(t *testing.T) { // Unauthenticated request, signed locator // => PermissionError - response = IssueRequest(rest, &RequestTester{ + response = IssueRequest(&RequestTester{ method: "GET", uri: signed_locator, }) @@ -133,7 +131,7 @@ func TestGetHandler(t *testing.T) { // Authenticated request, expired locator // => ExpiredError - response = IssueRequest(rest, &RequestTester{ + response = IssueRequest(&RequestTester{ method: "GET", uri: expired_locator, api_token: known_token, @@ -153,10 +151,7 @@ func TestPutHandler(t *testing.T) { // Prepare two test Keep volumes. KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Quit() - - // Set up a REST router for testing the handlers. - rest := MakeRESTRouter() + defer KeepVM.Close() // -------------- // No server key. @@ -164,7 +159,7 @@ func TestPutHandler(t *testing.T) { // Unauthenticated request, no server key // => OK (unsigned response) unsigned_locator := "/" + TEST_HASH - response := IssueRequest(rest, + response := IssueRequest( &RequestTester{ method: "PUT", uri: unsigned_locator, @@ -181,14 +176,14 @@ func TestPutHandler(t *testing.T) { // With a server key. PermissionSecret = []byte(known_key) - permission_ttl = time.Duration(300) * time.Second + blob_signature_ttl = 300 * time.Second // When a permission key is available, the locator returned // from an authenticated PUT request will be signed. // Authenticated PUT, signed locator // => OK (signed response) - response = IssueRequest(rest, + response = IssueRequest( &RequestTester{ method: "PUT", uri: unsigned_locator, @@ -200,7 +195,7 @@ func TestPutHandler(t *testing.T) { "Authenticated PUT, signed locator, with server key", http.StatusOK, response) response_locator := strings.TrimSpace(response.Body.String()) - if !VerifySignature(response_locator, known_token) { + if VerifySignature(response_locator, known_token) != nil { t.Errorf("Authenticated PUT, signed locator, with server key:\n"+ "response '%s' does not contain a valid signature", response_locator) @@ -208,7 +203,7 @@ func TestPutHandler(t *testing.T) { // Unauthenticated PUT, unsigned locator // => OK - response = IssueRequest(rest, + response = IssueRequest( &RequestTester{ method: "PUT", uri: unsigned_locator, @@ -223,22 +218,56 @@ func TestPutHandler(t *testing.T) { TEST_HASH_PUT_RESPONSE, response) } +func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) { + defer teardown() + data_manager_token = "fake-data-manager-token" + vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()} + vols[0].Readonly = true + KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]}) + defer KeepVM.Close() + IssueRequest( + &RequestTester{ + method: "PUT", + uri: "/" + TEST_HASH, + request_body: TEST_BLOCK, + }) + IssueRequest( + &RequestTester{ + method: "DELETE", + uri: "/" + TEST_HASH, + request_body: TEST_BLOCK, + api_token: data_manager_token, + }) + type expect struct { + volnum int + method string + callcount int + } + for _, e := range []expect{ + {0, "Get", 0}, + {0, "Touch", 0}, + {0, "Put", 0}, + {0, "Delete", 0}, + {1, "Get", 1}, + {1, "Put", 1}, + {1, "Delete", 1}, + } { + if calls := vols[e.volnum].CallCount(e.method); calls != e.callcount { + t.Errorf("Got %d %s() on vol %d, expect %d", calls, e.method, e.volnum, e.callcount) + } + } +} + // Test /index requests: -// - enforce_permissions off | unauthenticated /index request -// - enforce_permissions off | unauthenticated /index/prefix request -// - enforce_permissions off | authenticated /index request | non-superuser -// - enforce_permissions off | authenticated /index/prefix request | non-superuser -// - enforce_permissions off | authenticated /index request | superuser -// - enforce_permissions off | authenticated /index/prefix request | superuser -// - enforce_permissions on | unauthenticated /index request -// - enforce_permissions on | unauthenticated /index/prefix request -// - enforce_permissions on | authenticated /index request | non-superuser -// - enforce_permissions on | authenticated /index/prefix request | non-superuser -// - enforce_permissions on | authenticated /index request | superuser -// - enforce_permissions on | authenticated /index/prefix request | superuser +// - unauthenticated /index request +// - unauthenticated /index/prefix request +// - authenticated /index request | non-superuser +// - authenticated /index/prefix request | non-superuser +// - authenticated /index request | superuser +// - authenticated /index/prefix request | superuser // // The only /index requests that should succeed are those issued by the -// superuser when enforce_permissions = true. +// superuser. They should pass regardless of the value of enforce_permissions. // func TestIndexHandler(t *testing.T) { defer teardown() @@ -247,17 +276,14 @@ func TestIndexHandler(t *testing.T) { // Include multiple blocks on different volumes, and // some metadata files (which should be omitted from index listings) KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Quit() + defer KeepVM.Close() - vols := KeepVM.Volumes() + vols := KeepVM.AllWritable() vols[0].Put(TEST_HASH, TEST_BLOCK) vols[1].Put(TEST_HASH_2, TEST_BLOCK_2) vols[0].Put(TEST_HASH+".meta", []byte("metadata")) vols[1].Put(TEST_HASH_2+".meta", []byte("metadata")) - // Set up a REST router for testing the handlers. - rest := MakeRESTRouter() - data_manager_token = "DATA MANAGER TOKEN" unauthenticated_req := &RequestTester{ @@ -289,106 +315,69 @@ func TestIndexHandler(t *testing.T) { api_token: data_manager_token, } - // ---------------------------- - // enforce_permissions disabled - // All /index requests should fail. - enforce_permissions = false - - // unauthenticated /index request - // => PermissionError - response := IssueRequest(rest, unauthenticated_req) - ExpectStatusCode(t, - "enforce_permissions off, unauthenticated request", - PermissionError.HTTPCode, - response) - - // unauthenticated /index/prefix request - // => PermissionError - response = IssueRequest(rest, unauth_prefix_req) - ExpectStatusCode(t, - "enforce_permissions off, unauthenticated /index/prefix request", - PermissionError.HTTPCode, - response) - - // authenticated /index request, non-superuser - // => PermissionError - response = IssueRequest(rest, authenticated_req) - ExpectStatusCode(t, - "enforce_permissions off, authenticated request, non-superuser", - PermissionError.HTTPCode, - response) - - // authenticated /index/prefix request, non-superuser - // => PermissionError - response = IssueRequest(rest, auth_prefix_req) - ExpectStatusCode(t, - "enforce_permissions off, authenticated /index/prefix request, non-superuser", - PermissionError.HTTPCode, - response) - - // authenticated /index request, superuser - // => PermissionError - response = IssueRequest(rest, superuser_req) - ExpectStatusCode(t, - "enforce_permissions off, superuser request", - PermissionError.HTTPCode, - response) - - // superuser /index/prefix request - // => PermissionError - response = IssueRequest(rest, superuser_prefix_req) - ExpectStatusCode(t, - "enforce_permissions off, superuser /index/prefix request", - PermissionError.HTTPCode, - response) + // ------------------------------------------------------------- + // Only the superuser should be allowed to issue /index requests. // --------------------------- // enforce_permissions enabled - // Only the superuser should be allowed to issue /index requests. + // This setting should not affect tests passing. enforce_permissions = true // unauthenticated /index request - // => PermissionError - response = IssueRequest(rest, unauthenticated_req) + // => UnauthorizedError + response := IssueRequest(unauthenticated_req) ExpectStatusCode(t, "enforce_permissions on, unauthenticated request", - PermissionError.HTTPCode, + UnauthorizedError.HTTPCode, response) // unauthenticated /index/prefix request - // => PermissionError - response = IssueRequest(rest, unauth_prefix_req) + // => UnauthorizedError + response = IssueRequest(unauth_prefix_req) ExpectStatusCode(t, "permissions on, unauthenticated /index/prefix request", - PermissionError.HTTPCode, + UnauthorizedError.HTTPCode, response) // authenticated /index request, non-superuser - // => PermissionError - response = IssueRequest(rest, authenticated_req) + // => UnauthorizedError + response = IssueRequest(authenticated_req) ExpectStatusCode(t, "permissions on, authenticated request, non-superuser", - PermissionError.HTTPCode, + UnauthorizedError.HTTPCode, response) // authenticated /index/prefix request, non-superuser - // => PermissionError - response = IssueRequest(rest, auth_prefix_req) + // => UnauthorizedError + response = IssueRequest(auth_prefix_req) ExpectStatusCode(t, "permissions on, authenticated /index/prefix request, non-superuser", - PermissionError.HTTPCode, + UnauthorizedError.HTTPCode, response) // superuser /index request // => OK - response = IssueRequest(rest, superuser_req) + response = IssueRequest(superuser_req) + ExpectStatusCode(t, + "permissions on, superuser request", + http.StatusOK, + response) + + // ---------------------------- + // enforce_permissions disabled + // Valid Request should still pass. + enforce_permissions = false + + // superuser /index request + // => OK + response = IssueRequest(superuser_req) ExpectStatusCode(t, "permissions on, superuser request", http.StatusOK, response) expected := `^` + TEST_HASH + `\+\d+ \d+\n` + - TEST_HASH_2 + `\+\d+ \d+\n$` + TEST_HASH_2 + `\+\d+ \d+\n\n$` match, _ := regexp.MatchString(expected, response.Body.String()) if !match { t.Errorf( @@ -398,13 +387,13 @@ func TestIndexHandler(t *testing.T) { // superuser /index/prefix request // => OK - response = IssueRequest(rest, superuser_prefix_req) + response = IssueRequest(superuser_prefix_req) ExpectStatusCode(t, "permissions on, superuser request", http.StatusOK, response) - expected = `^` + TEST_HASH + `\+\d+ \d+\n$` + expected = `^` + TEST_HASH + `\+\d+ \d+\n\n$` match, _ = regexp.MatchString(expected, response.Body.String()) if !match { t.Errorf( @@ -446,18 +435,15 @@ func TestDeleteHandler(t *testing.T) { // Include multiple blocks on different volumes, and // some metadata files (which should be omitted from index listings) KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Quit() + defer KeepVM.Close() - vols := KeepVM.Volumes() + vols := KeepVM.AllWritable() vols[0].Put(TEST_HASH, TEST_BLOCK) - // Explicitly set the permission_ttl to 0 for these + // Explicitly set the blob_signature_ttl to 0 for these // tests, to ensure the MockVolume deletes the blocks // even though they have just been created. - permission_ttl = time.Duration(0) - - // Set up a REST router for testing the handlers. - rest := MakeRESTRouter() + blob_signature_ttl = time.Duration(0) var user_token = "NOT DATA MANAGER TOKEN" data_manager_token = "DATA MANAGER TOKEN" @@ -487,14 +473,14 @@ func TestDeleteHandler(t *testing.T) { // Unauthenticated request returns PermissionError. var response *httptest.ResponseRecorder - response = IssueRequest(rest, unauth_req) + response = IssueRequest(unauth_req) ExpectStatusCode(t, "unauthenticated request", PermissionError.HTTPCode, response) // Authenticated non-admin request returns PermissionError. - response = IssueRequest(rest, user_req) + response = IssueRequest(user_req) ExpectStatusCode(t, "authenticated non-admin request", PermissionError.HTTPCode, @@ -507,7 +493,7 @@ func TestDeleteHandler(t *testing.T) { } var response_dc, expected_dc deletecounter - response = IssueRequest(rest, superuser_nonexistent_block_req) + response = IssueRequest(superuser_nonexistent_block_req) ExpectStatusCode(t, "data manager request, nonexistent block", http.StatusNotFound, @@ -515,7 +501,7 @@ func TestDeleteHandler(t *testing.T) { // Authenticated admin request for existing block while never_delete is set. never_delete = true - response = IssueRequest(rest, superuser_existing_block_req) + response = IssueRequest(superuser_existing_block_req) ExpectStatusCode(t, "authenticated request, existing block, method disabled", MethodDisabledError.HTTPCode, @@ -523,7 +509,7 @@ func TestDeleteHandler(t *testing.T) { never_delete = false // Authenticated admin request for existing block. - response = IssueRequest(rest, superuser_existing_block_req) + response = IssueRequest(superuser_existing_block_req) ExpectStatusCode(t, "data manager request, existing block", http.StatusOK, @@ -542,12 +528,12 @@ func TestDeleteHandler(t *testing.T) { t.Error("superuser_existing_block_req: block not deleted") } - // A DELETE request on a block newer than permission_ttl should return - // success but leave the block on the volume. + // A DELETE request on a block newer than blob_signature_ttl + // should return success but leave the block on the volume. vols[0].Put(TEST_HASH, TEST_BLOCK) - permission_ttl = time.Duration(1) * time.Hour + blob_signature_ttl = time.Hour - response = IssueRequest(rest, superuser_existing_block_req) + response = IssueRequest(superuser_existing_block_req) ExpectStatusCode(t, "data manager request, existing block", http.StatusOK, @@ -596,12 +582,11 @@ func TestDeleteHandler(t *testing.T) { func TestPullHandler(t *testing.T) { defer teardown() - // Set up a REST router for testing the handlers. - rest := MakeRESTRouter() - var user_token = "USER TOKEN" data_manager_token = "DATA MANAGER TOKEN" + pullq = NewWorkQueue() + good_json := []byte(`[ { "locator":"locator_with_two_servers", @@ -651,12 +636,12 @@ func TestPullHandler(t *testing.T) { "Invalid pull request from the data manager", RequestTester{"/pull", data_manager_token, "PUT", bad_json}, http.StatusBadRequest, - "Bad Request\n", + "", }, } for _, tst := range testcases { - response := IssueRequest(rest, &tst.req) + response := IssueRequest(&tst.req) ExpectStatusCode(t, tst.name, tst.response_code, response) ExpectBody(t, tst.name, tst.response_body, response) } @@ -703,12 +688,11 @@ func TestPullHandler(t *testing.T) { func TestTrashHandler(t *testing.T) { defer teardown() - // Set up a REST router for testing the handlers. - rest := MakeRESTRouter() - var user_token = "USER TOKEN" data_manager_token = "DATA MANAGER TOKEN" + trashq = NewWorkQueue() + good_json := []byte(`[ { "locator":"block1", @@ -756,12 +740,12 @@ func TestTrashHandler(t *testing.T) { "Invalid trash list from the data manager", RequestTester{"/trash", data_manager_token, "PUT", bad_json}, http.StatusBadRequest, - "Bad Request\n", + "", }, } for _, tst := range testcases { - response := IssueRequest(rest, &tst.req) + response := IssueRequest(&tst.req) ExpectStatusCode(t, tst.name, tst.response_code, response) ExpectBody(t, tst.name, tst.response_body, response) } @@ -783,15 +767,16 @@ func TestTrashHandler(t *testing.T) { // ==================== // IssueTestRequest executes an HTTP request described by rt, to a -// specified REST router. It returns the HTTP response to the request. -func IssueRequest(router *mux.Router, rt *RequestTester) *httptest.ResponseRecorder { +// REST router. It returns the HTTP response to the request. +func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder { response := httptest.NewRecorder() body := bytes.NewReader(rt.request_body) req, _ := http.NewRequest(rt.method, rt.uri, body) if rt.api_token != "" { req.Header.Set("Authorization", "OAuth2 "+rt.api_token) } - router.ServeHTTP(response, req) + loggingRouter := MakeLoggingRESTRouter() + loggingRouter.ServeHTTP(response, req) return response } @@ -803,7 +788,7 @@ func ExpectStatusCode( expected_status int, response *httptest.ResponseRecorder) { if response.Code != expected_status { - t.Errorf("%s: expected status %s, got %+v", + t.Errorf("%s: expected status %d, got %+v", testname, expected_status, response) } } @@ -813,8 +798,87 @@ func ExpectBody( testname string, expected_body string, response *httptest.ResponseRecorder) { - if response.Body.String() != expected_body { + if expected_body != "" && response.Body.String() != expected_body { t.Errorf("%s: expected response body '%s', got %+v", testname, expected_body, response) } } + +// Invoke the PutBlockHandler a bunch of times to test for bufferpool resource +// leak. +func TestPutHandlerNoBufferleak(t *testing.T) { + defer teardown() + + // Prepare two test Keep volumes. + KeepVM = MakeTestVolumeManager(2) + defer KeepVM.Close() + + ok := make(chan bool) + go func() { + for i := 0; i < maxBuffers+1; i += 1 { + // Unauthenticated request, no server key + // => OK (unsigned response) + unsigned_locator := "/" + TEST_HASH + response := IssueRequest( + &RequestTester{ + method: "PUT", + uri: unsigned_locator, + request_body: TEST_BLOCK, + }) + ExpectStatusCode(t, + "TestPutHandlerBufferleak", http.StatusOK, response) + ExpectBody(t, + "TestPutHandlerBufferleak", + TEST_HASH_PUT_RESPONSE, response) + } + ok <- true + }() + select { + case <-time.After(20 * time.Second): + // If the buffer pool leaks, the test goroutine hangs. + t.Fatal("test did not finish, assuming pool leaked") + case <-ok: + } +} + +// Invoke the GetBlockHandler a bunch of times to test for bufferpool resource +// leak. +func TestGetHandlerNoBufferleak(t *testing.T) { + defer teardown() + + // Prepare two test Keep volumes. Our block is stored on the second volume. + KeepVM = MakeTestVolumeManager(2) + defer KeepVM.Close() + + vols := KeepVM.AllWritable() + if err := vols[0].Put(TEST_HASH, TEST_BLOCK); err != nil { + t.Error(err) + } + + ok := make(chan bool) + go func() { + for i := 0; i < maxBuffers+1; i += 1 { + // Unauthenticated request, unsigned locator + // => OK + unsigned_locator := "/" + TEST_HASH + response := IssueRequest( + &RequestTester{ + method: "GET", + uri: unsigned_locator, + }) + ExpectStatusCode(t, + "Unauthenticated request, unsigned locator", http.StatusOK, response) + ExpectBody(t, + "Unauthenticated request, unsigned locator", + string(TEST_BLOCK), + response) + } + ok <- true + }() + select { + case <-time.After(20 * time.Second): + // If the buffer pool leaks, the test goroutine hangs. + t.Fatal("test did not finish, assuming pool leaked") + case <-ok: + } +}