X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fb9235f1b9abe0661eb7640c4db0ab7001f90f1d..554fe927169e928d91c2d8c4bed158aef4d4d746:/services/keepstore/handler_test.go diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go index 0cfa1f30dd..40b4839e06 100644 --- a/services/keepstore/handler_test.go +++ b/services/keepstore/handler_test.go @@ -11,9 +11,9 @@ package main import ( "bytes" + "context" "encoding/json" "fmt" - "github.com/gorilla/mux" "net/http" "net/http/httptest" "os" @@ -21,15 +21,17 @@ import ( "strings" "testing" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" ) // A RequestTester represents the parameters for an HTTP request to // be issued on behalf of a unit test. type RequestTester struct { - uri string - api_token string - method string - request_body []byte + uri string + apiToken string + method string + requestBody []byte } // Test GetBlockHandler on the following situations: @@ -44,88 +46,87 @@ func TestGetHandler(t *testing.T) { // Prepare two test Keep volumes. Our block is stored on the second volume. KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Quit() + defer KeepVM.Close() - vols := KeepVM.Volumes() - if err := vols[0].Put(TEST_HASH, TEST_BLOCK); err != nil { + vols := KeepVM.AllWritable() + if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil { t.Error(err) } - // Set up a REST router for testing the handlers. - rest := MakeRESTRouter() - // Create locators for testing. // Turn on permission settings so we can generate signed locators. - enforce_permissions = true - PermissionSecret = []byte(known_key) - permission_ttl = time.Duration(300) * time.Second + theConfig.RequireSignatures = true + theConfig.blobSigningKey = []byte(knownKey) + theConfig.BlobSignatureTTL.Set("5m") var ( - unsigned_locator = "/" + TEST_HASH - valid_timestamp = time.Now().Add(permission_ttl) - expired_timestamp = time.Now().Add(-time.Hour) - signed_locator = "/" + SignLocator(TEST_HASH, known_token, valid_timestamp) - expired_locator = "/" + SignLocator(TEST_HASH, known_token, expired_timestamp) + unsignedLocator = "/" + TestHash + validTimestamp = time.Now().Add(theConfig.BlobSignatureTTL.Duration()) + expiredTimestamp = time.Now().Add(-time.Hour) + signedLocator = "/" + SignLocator(TestHash, knownToken, validTimestamp) + expiredLocator = "/" + SignLocator(TestHash, knownToken, expiredTimestamp) ) // ----------------- // Test unauthenticated request with permissions off. - enforce_permissions = false + theConfig.RequireSignatures = false // Unauthenticated request, unsigned locator // => OK - response := IssueRequest(rest, + response := IssueRequest( &RequestTester{ method: "GET", - uri: unsigned_locator, + uri: unsignedLocator, }) ExpectStatusCode(t, "Unauthenticated request, unsigned locator", http.StatusOK, response) ExpectBody(t, "Unauthenticated request, unsigned locator", - string(TEST_BLOCK), + string(TestBlock), response) - received_xbs := response.Header().Get("X-Block-Size") - expected_xbs := fmt.Sprintf("%d", len(TEST_BLOCK)) - if received_xbs != expected_xbs { - t.Errorf("expected X-Block-Size %s, got %s", expected_xbs, received_xbs) + + receivedLen := response.Header().Get("Content-Length") + expectedLen := fmt.Sprintf("%d", len(TestBlock)) + if receivedLen != expectedLen { + t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen) } // ---------------- // Permissions: on. - enforce_permissions = true + theConfig.RequireSignatures = true // Authenticated request, signed locator // => OK - response = IssueRequest(rest, &RequestTester{ - method: "GET", - uri: signed_locator, - api_token: known_token, + response = IssueRequest(&RequestTester{ + method: "GET", + uri: signedLocator, + apiToken: knownToken, }) ExpectStatusCode(t, "Authenticated request, signed locator", http.StatusOK, response) ExpectBody(t, - "Authenticated request, signed locator", string(TEST_BLOCK), response) - received_xbs = response.Header().Get("X-Block-Size") - expected_xbs = fmt.Sprintf("%d", len(TEST_BLOCK)) - if received_xbs != expected_xbs { - t.Errorf("expected X-Block-Size %s, got %s", expected_xbs, received_xbs) + "Authenticated request, signed locator", string(TestBlock), response) + + receivedLen = response.Header().Get("Content-Length") + expectedLen = fmt.Sprintf("%d", len(TestBlock)) + if receivedLen != expectedLen { + t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen) } // Authenticated request, unsigned locator // => PermissionError - response = IssueRequest(rest, &RequestTester{ - method: "GET", - uri: unsigned_locator, - api_token: known_token, + response = IssueRequest(&RequestTester{ + method: "GET", + uri: unsignedLocator, + apiToken: knownToken, }) ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response) // Unauthenticated request, signed locator // => PermissionError - response = IssueRequest(rest, &RequestTester{ + response = IssueRequest(&RequestTester{ method: "GET", - uri: signed_locator, + uri: signedLocator, }) ExpectStatusCode(t, "Unauthenticated request, signed locator", @@ -133,10 +134,10 @@ func TestGetHandler(t *testing.T) { // Authenticated request, expired locator // => ExpiredError - response = IssueRequest(rest, &RequestTester{ - method: "GET", - uri: expired_locator, - api_token: known_token, + response = IssueRequest(&RequestTester{ + method: "GET", + uri: expiredLocator, + apiToken: knownToken, }) ExpectStatusCode(t, "Authenticated request, expired locator", @@ -153,66 +154,63 @@ func TestPutHandler(t *testing.T) { // Prepare two test Keep volumes. KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Quit() - - // Set up a REST router for testing the handlers. - rest := MakeRESTRouter() + defer KeepVM.Close() // -------------- // No server key. // Unauthenticated request, no server key // => OK (unsigned response) - unsigned_locator := "/" + TEST_HASH - response := IssueRequest(rest, + unsignedLocator := "/" + TestHash + response := IssueRequest( &RequestTester{ - method: "PUT", - uri: unsigned_locator, - request_body: TEST_BLOCK, + method: "PUT", + uri: unsignedLocator, + requestBody: TestBlock, }) ExpectStatusCode(t, "Unauthenticated request, no server key", http.StatusOK, response) ExpectBody(t, "Unauthenticated request, no server key", - TEST_HASH_PUT_RESPONSE, response) + TestHashPutResp, response) // ------------------ // With a server key. - PermissionSecret = []byte(known_key) - permission_ttl = time.Duration(300) * time.Second + theConfig.blobSigningKey = []byte(knownKey) + theConfig.BlobSignatureTTL.Set("5m") // When a permission key is available, the locator returned // from an authenticated PUT request will be signed. // Authenticated PUT, signed locator // => OK (signed response) - response = IssueRequest(rest, + response = IssueRequest( &RequestTester{ - method: "PUT", - uri: unsigned_locator, - request_body: TEST_BLOCK, - api_token: known_token, + method: "PUT", + uri: unsignedLocator, + requestBody: TestBlock, + apiToken: knownToken, }) ExpectStatusCode(t, "Authenticated PUT, signed locator, with server key", http.StatusOK, response) - response_locator := strings.TrimSpace(response.Body.String()) - if !VerifySignature(response_locator, known_token) { + responseLocator := strings.TrimSpace(response.Body.String()) + if VerifySignature(responseLocator, knownToken) != nil { t.Errorf("Authenticated PUT, signed locator, with server key:\n"+ "response '%s' does not contain a valid signature", - response_locator) + responseLocator) } // Unauthenticated PUT, unsigned locator // => OK - response = IssueRequest(rest, + response = IssueRequest( &RequestTester{ - method: "PUT", - uri: unsigned_locator, - request_body: TEST_BLOCK, + method: "PUT", + uri: unsignedLocator, + requestBody: TestBlock, }) ExpectStatusCode(t, @@ -220,25 +218,66 @@ func TestPutHandler(t *testing.T) { http.StatusOK, response) ExpectBody(t, "Unauthenticated PUT, unsigned locator, with server key", - TEST_HASH_PUT_RESPONSE, response) + TestHashPutResp, response) +} + +func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) { + defer teardown() + theConfig.systemAuthToken = "fake-data-manager-token" + vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()} + vols[0].Readonly = true + KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]}) + defer KeepVM.Close() + IssueRequest( + &RequestTester{ + method: "PUT", + uri: "/" + TestHash, + requestBody: TestBlock, + }) + defer func(orig bool) { + theConfig.EnableDelete = orig + }(theConfig.EnableDelete) + theConfig.EnableDelete = true + IssueRequest( + &RequestTester{ + method: "DELETE", + uri: "/" + TestHash, + requestBody: TestBlock, + apiToken: theConfig.systemAuthToken, + }) + type expect struct { + volnum int + method string + callcount int + } + for _, e := range []expect{ + {0, "Get", 0}, + {0, "Compare", 0}, + {0, "Touch", 0}, + {0, "Put", 0}, + {0, "Delete", 0}, + {1, "Get", 0}, + {1, "Compare", 1}, + {1, "Touch", 1}, + {1, "Put", 1}, + {1, "Delete", 1}, + } { + if calls := vols[e.volnum].CallCount(e.method); calls != e.callcount { + t.Errorf("Got %d %s() on vol %d, expect %d", calls, e.method, e.volnum, e.callcount) + } + } } // Test /index requests: -// - enforce_permissions off | unauthenticated /index request -// - enforce_permissions off | unauthenticated /index/prefix request -// - enforce_permissions off | authenticated /index request | non-superuser -// - enforce_permissions off | authenticated /index/prefix request | non-superuser -// - enforce_permissions off | authenticated /index request | superuser -// - enforce_permissions off | authenticated /index/prefix request | superuser -// - enforce_permissions on | unauthenticated /index request -// - enforce_permissions on | unauthenticated /index/prefix request -// - enforce_permissions on | authenticated /index request | non-superuser -// - enforce_permissions on | authenticated /index/prefix request | non-superuser -// - enforce_permissions on | authenticated /index request | superuser -// - enforce_permissions on | authenticated /index/prefix request | superuser +// - unauthenticated /index request +// - unauthenticated /index/prefix request +// - authenticated /index request | non-superuser +// - authenticated /index/prefix request | non-superuser +// - authenticated /index request | superuser +// - authenticated /index/prefix request | superuser // // The only /index requests that should succeed are those issued by the -// superuser when enforce_permissions = true. +// superuser. They should pass regardless of the value of RequireSignatures. // func TestIndexHandler(t *testing.T) { defer teardown() @@ -247,148 +286,118 @@ func TestIndexHandler(t *testing.T) { // Include multiple blocks on different volumes, and // some metadata files (which should be omitted from index listings) KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Quit() + defer KeepVM.Close() - vols := KeepVM.Volumes() - vols[0].Put(TEST_HASH, TEST_BLOCK) - vols[1].Put(TEST_HASH_2, TEST_BLOCK_2) - vols[0].Put(TEST_HASH+".meta", []byte("metadata")) - vols[1].Put(TEST_HASH_2+".meta", []byte("metadata")) + vols := KeepVM.AllWritable() + vols[0].Put(context.Background(), TestHash, TestBlock) + vols[1].Put(context.Background(), TestHash2, TestBlock2) + vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata")) + vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata")) - // Set up a REST router for testing the handlers. - rest := MakeRESTRouter() + theConfig.systemAuthToken = "DATA MANAGER TOKEN" - data_manager_token = "DATA MANAGER TOKEN" - - unauthenticated_req := &RequestTester{ + unauthenticatedReq := &RequestTester{ method: "GET", uri: "/index", } - authenticated_req := &RequestTester{ - method: "GET", - uri: "/index", - api_token: known_token, + authenticatedReq := &RequestTester{ + method: "GET", + uri: "/index", + apiToken: knownToken, } - superuser_req := &RequestTester{ - method: "GET", - uri: "/index", - api_token: data_manager_token, + superuserReq := &RequestTester{ + method: "GET", + uri: "/index", + apiToken: theConfig.systemAuthToken, } - unauth_prefix_req := &RequestTester{ + unauthPrefixReq := &RequestTester{ method: "GET", - uri: "/index/" + TEST_HASH[0:3], + uri: "/index/" + TestHash[0:3], } - auth_prefix_req := &RequestTester{ - method: "GET", - uri: "/index/" + TEST_HASH[0:3], - api_token: known_token, + authPrefixReq := &RequestTester{ + method: "GET", + uri: "/index/" + TestHash[0:3], + apiToken: knownToken, } - superuser_prefix_req := &RequestTester{ - method: "GET", - uri: "/index/" + TEST_HASH[0:3], - api_token: data_manager_token, + superuserPrefixReq := &RequestTester{ + method: "GET", + uri: "/index/" + TestHash[0:3], + apiToken: theConfig.systemAuthToken, + } + superuserNoSuchPrefixReq := &RequestTester{ + method: "GET", + uri: "/index/abcd", + apiToken: theConfig.systemAuthToken, + } + superuserInvalidPrefixReq := &RequestTester{ + method: "GET", + uri: "/index/xyz", + apiToken: theConfig.systemAuthToken, } - // ---------------------------- - // enforce_permissions disabled - // All /index requests should fail. - enforce_permissions = false - - // unauthenticated /index request - // => PermissionError - response := IssueRequest(rest, unauthenticated_req) - ExpectStatusCode(t, - "enforce_permissions off, unauthenticated request", - PermissionError.HTTPCode, - response) - - // unauthenticated /index/prefix request - // => PermissionError - response = IssueRequest(rest, unauth_prefix_req) - ExpectStatusCode(t, - "enforce_permissions off, unauthenticated /index/prefix request", - PermissionError.HTTPCode, - response) - - // authenticated /index request, non-superuser - // => PermissionError - response = IssueRequest(rest, authenticated_req) - ExpectStatusCode(t, - "enforce_permissions off, authenticated request, non-superuser", - PermissionError.HTTPCode, - response) - - // authenticated /index/prefix request, non-superuser - // => PermissionError - response = IssueRequest(rest, auth_prefix_req) - ExpectStatusCode(t, - "enforce_permissions off, authenticated /index/prefix request, non-superuser", - PermissionError.HTTPCode, - response) - - // authenticated /index request, superuser - // => PermissionError - response = IssueRequest(rest, superuser_req) - ExpectStatusCode(t, - "enforce_permissions off, superuser request", - PermissionError.HTTPCode, - response) - - // superuser /index/prefix request - // => PermissionError - response = IssueRequest(rest, superuser_prefix_req) - ExpectStatusCode(t, - "enforce_permissions off, superuser /index/prefix request", - PermissionError.HTTPCode, - response) + // ------------------------------------------------------------- + // Only the superuser should be allowed to issue /index requests. // --------------------------- - // enforce_permissions enabled - // Only the superuser should be allowed to issue /index requests. - enforce_permissions = true + // RequireSignatures enabled + // This setting should not affect tests passing. + theConfig.RequireSignatures = true // unauthenticated /index request - // => PermissionError - response = IssueRequest(rest, unauthenticated_req) + // => UnauthorizedError + response := IssueRequest(unauthenticatedReq) ExpectStatusCode(t, - "enforce_permissions on, unauthenticated request", - PermissionError.HTTPCode, + "RequireSignatures on, unauthenticated request", + UnauthorizedError.HTTPCode, response) // unauthenticated /index/prefix request - // => PermissionError - response = IssueRequest(rest, unauth_prefix_req) + // => UnauthorizedError + response = IssueRequest(unauthPrefixReq) ExpectStatusCode(t, "permissions on, unauthenticated /index/prefix request", - PermissionError.HTTPCode, + UnauthorizedError.HTTPCode, response) // authenticated /index request, non-superuser - // => PermissionError - response = IssueRequest(rest, authenticated_req) + // => UnauthorizedError + response = IssueRequest(authenticatedReq) ExpectStatusCode(t, "permissions on, authenticated request, non-superuser", - PermissionError.HTTPCode, + UnauthorizedError.HTTPCode, response) // authenticated /index/prefix request, non-superuser - // => PermissionError - response = IssueRequest(rest, auth_prefix_req) + // => UnauthorizedError + response = IssueRequest(authPrefixReq) ExpectStatusCode(t, "permissions on, authenticated /index/prefix request, non-superuser", - PermissionError.HTTPCode, + UnauthorizedError.HTTPCode, response) // superuser /index request // => OK - response = IssueRequest(rest, superuser_req) + response = IssueRequest(superuserReq) ExpectStatusCode(t, "permissions on, superuser request", http.StatusOK, response) - expected := `^` + TEST_HASH + `\+\d+ \d+\n` + - TEST_HASH_2 + `\+\d+ \d+\n$` + // ---------------------------- + // RequireSignatures disabled + // Valid Request should still pass. + theConfig.RequireSignatures = false + + // superuser /index request + // => OK + response = IssueRequest(superuserReq) + ExpectStatusCode(t, + "permissions on, superuser request", + http.StatusOK, + response) + + expected := `^` + TestHash + `\+\d+ \d+\n` + + TestHash2 + `\+\d+ \d+\n\n$` match, _ := regexp.MatchString(expected, response.Body.String()) if !match { t.Errorf( @@ -398,19 +407,39 @@ func TestIndexHandler(t *testing.T) { // superuser /index/prefix request // => OK - response = IssueRequest(rest, superuser_prefix_req) + response = IssueRequest(superuserPrefixReq) ExpectStatusCode(t, "permissions on, superuser request", http.StatusOK, response) - expected = `^` + TEST_HASH + `\+\d+ \d+\n$` + expected = `^` + TestHash + `\+\d+ \d+\n\n$` match, _ = regexp.MatchString(expected, response.Body.String()) if !match { t.Errorf( "permissions on, superuser /index/prefix request: expected %s, got:\n%s", expected, response.Body.String()) } + + // superuser /index/{no-such-prefix} request + // => OK + response = IssueRequest(superuserNoSuchPrefixReq) + ExpectStatusCode(t, + "permissions on, superuser request", + http.StatusOK, + response) + + if "\n" != response.Body.String() { + t.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String()) + } + + // superuser /index/{invalid-prefix} request + // => StatusBadRequest + response = IssueRequest(superuserInvalidPrefixReq) + ExpectStatusCode(t, + "permissions on, superuser request", + http.StatusBadRequest, + response) } // TestDeleteHandler @@ -446,55 +475,54 @@ func TestDeleteHandler(t *testing.T) { // Include multiple blocks on different volumes, and // some metadata files (which should be omitted from index listings) KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Quit() + defer KeepVM.Close() - vols := KeepVM.Volumes() - vols[0].Put(TEST_HASH, TEST_BLOCK) + vols := KeepVM.AllWritable() + vols[0].Put(context.Background(), TestHash, TestBlock) - // Explicitly set the permission_ttl to 0 for these + // Explicitly set the BlobSignatureTTL to 0 for these // tests, to ensure the MockVolume deletes the blocks // even though they have just been created. - permission_ttl = time.Duration(0) + theConfig.BlobSignatureTTL = arvados.Duration(0) - // Set up a REST router for testing the handlers. - rest := MakeRESTRouter() + var userToken = "NOT DATA MANAGER TOKEN" + theConfig.systemAuthToken = "DATA MANAGER TOKEN" - var user_token = "NOT DATA MANAGER TOKEN" - data_manager_token = "DATA MANAGER TOKEN" + theConfig.EnableDelete = true - unauth_req := &RequestTester{ + unauthReq := &RequestTester{ method: "DELETE", - uri: "/" + TEST_HASH, + uri: "/" + TestHash, } - user_req := &RequestTester{ - method: "DELETE", - uri: "/" + TEST_HASH, - api_token: user_token, + userReq := &RequestTester{ + method: "DELETE", + uri: "/" + TestHash, + apiToken: userToken, } - superuser_existing_block_req := &RequestTester{ - method: "DELETE", - uri: "/" + TEST_HASH, - api_token: data_manager_token, + superuserExistingBlockReq := &RequestTester{ + method: "DELETE", + uri: "/" + TestHash, + apiToken: theConfig.systemAuthToken, } - superuser_nonexistent_block_req := &RequestTester{ - method: "DELETE", - uri: "/" + TEST_HASH_2, - api_token: data_manager_token, + superuserNonexistentBlockReq := &RequestTester{ + method: "DELETE", + uri: "/" + TestHash2, + apiToken: theConfig.systemAuthToken, } // Unauthenticated request returns PermissionError. var response *httptest.ResponseRecorder - response = IssueRequest(rest, unauth_req) + response = IssueRequest(unauthReq) ExpectStatusCode(t, "unauthenticated request", PermissionError.HTTPCode, response) // Authenticated non-admin request returns PermissionError. - response = IssueRequest(rest, user_req) + response = IssueRequest(userReq) ExpectStatusCode(t, "authenticated non-admin request", PermissionError.HTTPCode, @@ -505,62 +533,63 @@ func TestDeleteHandler(t *testing.T) { Deleted int `json:"copies_deleted"` Failed int `json:"copies_failed"` } - var response_dc, expected_dc deletecounter + var responseDc, expectedDc deletecounter - response = IssueRequest(rest, superuser_nonexistent_block_req) + response = IssueRequest(superuserNonexistentBlockReq) ExpectStatusCode(t, "data manager request, nonexistent block", http.StatusNotFound, response) - // Authenticated admin request for existing block while never_delete is set. - never_delete = true - response = IssueRequest(rest, superuser_existing_block_req) + // Authenticated admin request for existing block while EnableDelete is false. + theConfig.EnableDelete = false + response = IssueRequest(superuserExistingBlockReq) ExpectStatusCode(t, "authenticated request, existing block, method disabled", MethodDisabledError.HTTPCode, response) - never_delete = false + theConfig.EnableDelete = true // Authenticated admin request for existing block. - response = IssueRequest(rest, superuser_existing_block_req) + response = IssueRequest(superuserExistingBlockReq) ExpectStatusCode(t, "data manager request, existing block", http.StatusOK, response) // Expect response {"copies_deleted":1,"copies_failed":0} - expected_dc = deletecounter{1, 0} - json.NewDecoder(response.Body).Decode(&response_dc) - if response_dc != expected_dc { - t.Errorf("superuser_existing_block_req\nexpected: %+v\nreceived: %+v", - expected_dc, response_dc) + expectedDc = deletecounter{1, 0} + json.NewDecoder(response.Body).Decode(&responseDc) + if responseDc != expectedDc { + t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v", + expectedDc, responseDc) } // Confirm the block has been deleted - _, err := vols[0].Get(TEST_HASH) - var block_deleted = os.IsNotExist(err) - if !block_deleted { - t.Error("superuser_existing_block_req: block not deleted") + buf := make([]byte, BlockSize) + _, err := vols[0].Get(context.Background(), TestHash, buf) + var blockDeleted = os.IsNotExist(err) + if !blockDeleted { + t.Error("superuserExistingBlockReq: block not deleted") } - // A DELETE request on a block newer than permission_ttl should return - // success but leave the block on the volume. - vols[0].Put(TEST_HASH, TEST_BLOCK) - permission_ttl = time.Duration(1) * time.Hour + // A DELETE request on a block newer than BlobSignatureTTL + // should return success but leave the block on the volume. + vols[0].Put(context.Background(), TestHash, TestBlock) + theConfig.BlobSignatureTTL = arvados.Duration(time.Hour) - response = IssueRequest(rest, superuser_existing_block_req) + response = IssueRequest(superuserExistingBlockReq) ExpectStatusCode(t, "data manager request, existing block", http.StatusOK, response) // Expect response {"copies_deleted":1,"copies_failed":0} - expected_dc = deletecounter{1, 0} - json.NewDecoder(response.Body).Decode(&response_dc) - if response_dc != expected_dc { - t.Errorf("superuser_existing_block_req\nexpected: %+v\nreceived: %+v", - expected_dc, response_dc) + expectedDc = deletecounter{1, 0} + json.NewDecoder(response.Body).Decode(&responseDc) + if responseDc != expectedDc { + t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v", + expectedDc, responseDc) } // Confirm the block has NOT been deleted. - _, err = vols[0].Get(TEST_HASH) + _, err = vols[0].Get(context.Background(), TestHash, buf) if err != nil { t.Errorf("testing delete on new block: %s\n", err) } @@ -596,13 +625,12 @@ func TestDeleteHandler(t *testing.T) { func TestPullHandler(t *testing.T) { defer teardown() - // Set up a REST router for testing the handlers. - rest := MakeRESTRouter() + var userToken = "USER TOKEN" + theConfig.systemAuthToken = "DATA MANAGER TOKEN" - var user_token = "USER TOKEN" - data_manager_token = "DATA MANAGER TOKEN" + pullq = NewWorkQueue() - good_json := []byte(`[ + goodJSON := []byte(`[ { "locator":"locator_with_two_servers", "servers":[ @@ -620,58 +648,161 @@ func TestPullHandler(t *testing.T) { } ]`) - bad_json := []byte(`{ "key":"I'm a little teapot" }`) + badJSON := []byte(`{ "key":"I'm a little teapot" }`) type pullTest struct { - name string - req RequestTester - response_code int - response_body string + name string + req RequestTester + responseCode int + responseBody string } var testcases = []pullTest{ { - "user token, good request", - RequestTester{"/pull", user_token, "PUT", good_json}, + "Valid pull list from an ordinary user", + RequestTester{"/pull", userToken, "PUT", goodJSON}, http.StatusUnauthorized, "Unauthorized\n", }, { - "user token, bad request", - RequestTester{"/pull", user_token, "PUT", bad_json}, + "Invalid pull request from an ordinary user", + RequestTester{"/pull", userToken, "PUT", badJSON}, http.StatusUnauthorized, "Unauthorized\n", }, { - "data manager token, good request", - RequestTester{"/pull", data_manager_token, "PUT", good_json}, + "Valid pull request from the data manager", + RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON}, http.StatusOK, "Received 3 pull requests\n", }, { - "data manager token, bad request", - RequestTester{"/pull", data_manager_token, "PUT", bad_json}, + "Invalid pull request from the data manager", + RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON}, http.StatusBadRequest, - "Bad Request\n", + "", }, } for _, tst := range testcases { - response := IssueRequest(rest, &tst.req) - ExpectStatusCode(t, tst.name, tst.response_code, response) - ExpectBody(t, tst.name, tst.response_body, response) + response := IssueRequest(&tst.req) + ExpectStatusCode(t, tst.name, tst.responseCode, response) + ExpectBody(t, tst.name, tst.responseBody, response) } // The Keep pull manager should have received one good list with 3 // requests on it. - var output_list = make([]PullRequest, 3) for i := 0; i < 3; i++ { item := <-pullq.NextItem - if pr, ok := item.(PullRequest); ok { - output_list[i] = pr - } else { + if _, ok := item.(PullRequest); !ok { t.Errorf("item %v could not be parsed as a PullRequest", item) } } + + expectChannelEmpty(t, pullq.NextItem) +} + +// TestTrashHandler +// +// Test cases: +// +// Cases tested: syntactically valid and invalid trash lists, from the +// data manager and from unprivileged users: +// +// 1. Valid trash list from an ordinary user +// (expected result: 401 Unauthorized) +// +// 2. Invalid trash list from an ordinary user +// (expected result: 401 Unauthorized) +// +// 3. Valid trash list from the data manager +// (expected result: 200 OK with request body "Received 3 trash +// requests" +// +// 4. Invalid trash list from the data manager +// (expected result: 400 Bad Request) +// +// Test that in the end, the trash collector received a good list +// trash list with the expected number of requests. +// +// TODO(twp): test concurrency: launch 100 goroutines to update the +// pull list simultaneously. Make sure that none of them return 400 +// Bad Request and that replica.Dump() returns a valid list. +// +func TestTrashHandler(t *testing.T) { + defer teardown() + + var userToken = "USER TOKEN" + theConfig.systemAuthToken = "DATA MANAGER TOKEN" + + trashq = NewWorkQueue() + + goodJSON := []byte(`[ + { + "locator":"block1", + "block_mtime":1409082153 + }, + { + "locator":"block2", + "block_mtime":1409082153 + }, + { + "locator":"block3", + "block_mtime":1409082153 + } + ]`) + + badJSON := []byte(`I am not a valid JSON string`) + + type trashTest struct { + name string + req RequestTester + responseCode int + responseBody string + } + + var testcases = []trashTest{ + { + "Valid trash list from an ordinary user", + RequestTester{"/trash", userToken, "PUT", goodJSON}, + http.StatusUnauthorized, + "Unauthorized\n", + }, + { + "Invalid trash list from an ordinary user", + RequestTester{"/trash", userToken, "PUT", badJSON}, + http.StatusUnauthorized, + "Unauthorized\n", + }, + { + "Valid trash list from the data manager", + RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON}, + http.StatusOK, + "Received 3 trash requests\n", + }, + { + "Invalid trash list from the data manager", + RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON}, + http.StatusBadRequest, + "", + }, + } + + for _, tst := range testcases { + response := IssueRequest(&tst.req) + ExpectStatusCode(t, tst.name, tst.responseCode, response) + ExpectBody(t, tst.name, tst.responseBody, response) + } + + // The trash collector should have received one good list with 3 + // requests on it. + for i := 0; i < 3; i++ { + item := <-trashq.NextItem + if _, ok := item.(TrashRequest); !ok { + t.Errorf("item %v could not be parsed as a TrashRequest", item) + } + } + + expectChannelEmpty(t, trashq.NextItem) } // ==================== @@ -679,15 +810,16 @@ func TestPullHandler(t *testing.T) { // ==================== // IssueTestRequest executes an HTTP request described by rt, to a -// specified REST router. It returns the HTTP response to the request. -func IssueRequest(router *mux.Router, rt *RequestTester) *httptest.ResponseRecorder { +// REST router. It returns the HTTP response to the request. +func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder { response := httptest.NewRecorder() - body := bytes.NewReader(rt.request_body) + body := bytes.NewReader(rt.requestBody) req, _ := http.NewRequest(rt.method, rt.uri, body) - if rt.api_token != "" { - req.Header.Set("Authorization", "OAuth2 "+rt.api_token) + if rt.apiToken != "" { + req.Header.Set("Authorization", "OAuth2 "+rt.apiToken) } - router.ServeHTTP(response, req) + loggingRouter := MakeRESTRouter() + loggingRouter.ServeHTTP(response, req) return response } @@ -696,21 +828,311 @@ func IssueRequest(router *mux.Router, rt *RequestTester) *httptest.ResponseRecor func ExpectStatusCode( t *testing.T, testname string, - expected_status int, + expectedStatus int, response *httptest.ResponseRecorder) { - if response.Code != expected_status { - t.Errorf("%s: expected status %s, got %+v", - testname, expected_status, response) + if response.Code != expectedStatus { + t.Errorf("%s: expected status %d, got %+v", + testname, expectedStatus, response) } } func ExpectBody( t *testing.T, testname string, - expected_body string, + expectedBody string, response *httptest.ResponseRecorder) { - if response.Body.String() != expected_body { + if expectedBody != "" && response.Body.String() != expectedBody { t.Errorf("%s: expected response body '%s', got %+v", - testname, expected_body, response) + testname, expectedBody, response) + } +} + +// See #7121 +func TestPutNeedsOnlyOneBuffer(t *testing.T) { + defer teardown() + KeepVM = MakeTestVolumeManager(1) + defer KeepVM.Close() + + defer func(orig *bufferPool) { + bufs = orig + }(bufs) + bufs = newBufferPool(1, BlockSize) + + ok := make(chan struct{}) + go func() { + for i := 0; i < 2; i++ { + response := IssueRequest( + &RequestTester{ + method: "PUT", + uri: "/" + TestHash, + requestBody: TestBlock, + }) + ExpectStatusCode(t, + "TestPutNeedsOnlyOneBuffer", http.StatusOK, response) + } + ok <- struct{}{} + }() + + select { + case <-ok: + case <-time.After(time.Second): + t.Fatal("PUT deadlocks with MaxBuffers==1") + } +} + +// Invoke the PutBlockHandler a bunch of times to test for bufferpool resource +// leak. +func TestPutHandlerNoBufferleak(t *testing.T) { + defer teardown() + + // Prepare two test Keep volumes. + KeepVM = MakeTestVolumeManager(2) + defer KeepVM.Close() + + ok := make(chan bool) + go func() { + for i := 0; i < theConfig.MaxBuffers+1; i++ { + // Unauthenticated request, no server key + // => OK (unsigned response) + unsignedLocator := "/" + TestHash + response := IssueRequest( + &RequestTester{ + method: "PUT", + uri: unsignedLocator, + requestBody: TestBlock, + }) + ExpectStatusCode(t, + "TestPutHandlerBufferleak", http.StatusOK, response) + ExpectBody(t, + "TestPutHandlerBufferleak", + TestHashPutResp, response) + } + ok <- true + }() + select { + case <-time.After(20 * time.Second): + // If the buffer pool leaks, the test goroutine hangs. + t.Fatal("test did not finish, assuming pool leaked") + case <-ok: + } +} + +type notifyingResponseRecorder struct { + *httptest.ResponseRecorder + closer chan bool +} + +func (r *notifyingResponseRecorder) CloseNotify() <-chan bool { + return r.closer +} + +func TestGetHandlerClientDisconnect(t *testing.T) { + defer func(was bool) { + theConfig.RequireSignatures = was + }(theConfig.RequireSignatures) + theConfig.RequireSignatures = false + + defer func(orig *bufferPool) { + bufs = orig + }(bufs) + bufs = newBufferPool(1, BlockSize) + defer bufs.Put(bufs.Get(BlockSize)) + + KeepVM = MakeTestVolumeManager(2) + defer KeepVM.Close() + + if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil { + t.Error(err) + } + + resp := ¬ifyingResponseRecorder{ + ResponseRecorder: httptest.NewRecorder(), + closer: make(chan bool, 1), + } + if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok { + t.Fatal("notifyingResponseRecorder is broken") + } + // If anyone asks, the client has disconnected. + resp.closer <- true + + ok := make(chan struct{}) + go func() { + req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil) + (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req) + ok <- struct{}{} + }() + + select { + case <-time.After(20 * time.Second): + t.Fatal("request took >20s, close notifier must be broken") + case <-ok: + } + + ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder) + for i, v := range KeepVM.AllWritable() { + if calls := v.(*MockVolume).called["GET"]; calls != 0 { + t.Errorf("volume %d got %d calls, expected 0", i, calls) + } + } +} + +// Invoke the GetBlockHandler a bunch of times to test for bufferpool resource +// leak. +func TestGetHandlerNoBufferLeak(t *testing.T) { + defer teardown() + + // Prepare two test Keep volumes. Our block is stored on the second volume. + KeepVM = MakeTestVolumeManager(2) + defer KeepVM.Close() + + vols := KeepVM.AllWritable() + if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil { + t.Error(err) + } + + ok := make(chan bool) + go func() { + for i := 0; i < theConfig.MaxBuffers+1; i++ { + // Unauthenticated request, unsigned locator + // => OK + unsignedLocator := "/" + TestHash + response := IssueRequest( + &RequestTester{ + method: "GET", + uri: unsignedLocator, + }) + ExpectStatusCode(t, + "Unauthenticated request, unsigned locator", http.StatusOK, response) + ExpectBody(t, + "Unauthenticated request, unsigned locator", + string(TestBlock), + response) + } + ok <- true + }() + select { + case <-time.After(20 * time.Second): + // If the buffer pool leaks, the test goroutine hangs. + t.Fatal("test did not finish, assuming pool leaked") + case <-ok: + } +} + +func TestPutReplicationHeader(t *testing.T) { + defer teardown() + + KeepVM = MakeTestVolumeManager(2) + defer KeepVM.Close() + + resp := IssueRequest(&RequestTester{ + method: "PUT", + uri: "/" + TestHash, + requestBody: TestBlock, + }) + if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" { + t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1") + } +} + +func TestUntrashHandler(t *testing.T) { + defer teardown() + + // Set up Keep volumes + KeepVM = MakeTestVolumeManager(2) + defer KeepVM.Close() + vols := KeepVM.AllWritable() + vols[0].Put(context.Background(), TestHash, TestBlock) + + theConfig.systemAuthToken = "DATA MANAGER TOKEN" + + // unauthenticatedReq => UnauthorizedError + unauthenticatedReq := &RequestTester{ + method: "PUT", + uri: "/untrash/" + TestHash, + } + response := IssueRequest(unauthenticatedReq) + ExpectStatusCode(t, + "Unauthenticated request", + UnauthorizedError.HTTPCode, + response) + + // notDataManagerReq => UnauthorizedError + notDataManagerReq := &RequestTester{ + method: "PUT", + uri: "/untrash/" + TestHash, + apiToken: knownToken, } + + response = IssueRequest(notDataManagerReq) + ExpectStatusCode(t, + "Non-datamanager token", + UnauthorizedError.HTTPCode, + response) + + // datamanagerWithBadHashReq => StatusBadRequest + datamanagerWithBadHashReq := &RequestTester{ + method: "PUT", + uri: "/untrash/thisisnotalocator", + apiToken: theConfig.systemAuthToken, + } + response = IssueRequest(datamanagerWithBadHashReq) + ExpectStatusCode(t, + "Bad locator in untrash request", + http.StatusBadRequest, + response) + + // datamanagerWrongMethodReq => StatusBadRequest + datamanagerWrongMethodReq := &RequestTester{ + method: "GET", + uri: "/untrash/" + TestHash, + apiToken: theConfig.systemAuthToken, + } + response = IssueRequest(datamanagerWrongMethodReq) + ExpectStatusCode(t, + "Only PUT method is supported for untrash", + http.StatusBadRequest, + response) + + // datamanagerReq => StatusOK + datamanagerReq := &RequestTester{ + method: "PUT", + uri: "/untrash/" + TestHash, + apiToken: theConfig.systemAuthToken, + } + response = IssueRequest(datamanagerReq) + ExpectStatusCode(t, + "", + http.StatusOK, + response) + expected := "Successfully untrashed on: [MockVolume],[MockVolume]" + if response.Body.String() != expected { + t.Errorf( + "Untrash response mismatched: expected %s, got:\n%s", + expected, response.Body.String()) + } +} + +func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) { + defer teardown() + + // Set up readonly Keep volumes + vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()} + vols[0].Readonly = true + vols[1].Readonly = true + KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]}) + defer KeepVM.Close() + + theConfig.systemAuthToken = "DATA MANAGER TOKEN" + + // datamanagerReq => StatusOK + datamanagerReq := &RequestTester{ + method: "PUT", + uri: "/untrash/" + TestHash, + apiToken: theConfig.systemAuthToken, + } + response := IssueRequest(datamanagerReq) + ExpectStatusCode(t, + "No writable volumes", + http.StatusNotFound, + response) }