X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a50278e3d0e26bb5d513d0af5da2fb559b112388..4ed4b6554535849341673efb7f80392dd5fba946:/services/keepstore/handler_test.go diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go index 3817ea1900..40b4839e06 100644 --- a/services/keepstore/handler_test.go +++ b/services/keepstore/handler_test.go @@ -11,6 +11,7 @@ package main import ( "bytes" + "context" "encoding/json" "fmt" "net/http" @@ -20,6 +21,8 @@ import ( "strings" "testing" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" ) // A RequestTester represents the parameters for an HTTP request to @@ -46,19 +49,19 @@ func TestGetHandler(t *testing.T) { defer KeepVM.Close() vols := KeepVM.AllWritable() - if err := vols[0].Put(TestHash, TestBlock); err != nil { + if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil { t.Error(err) } // Create locators for testing. // Turn on permission settings so we can generate signed locators. - enforcePermissions = true - PermissionSecret = []byte(knownKey) - blobSignatureTTL = 300 * time.Second + theConfig.RequireSignatures = true + theConfig.blobSigningKey = []byte(knownKey) + theConfig.BlobSignatureTTL.Set("5m") var ( unsignedLocator = "/" + TestHash - validTimestamp = time.Now().Add(blobSignatureTTL) + validTimestamp = time.Now().Add(theConfig.BlobSignatureTTL.Duration()) expiredTimestamp = time.Now().Add(-time.Hour) signedLocator = "/" + SignLocator(TestHash, knownToken, validTimestamp) expiredLocator = "/" + SignLocator(TestHash, knownToken, expiredTimestamp) @@ -66,7 +69,7 @@ func TestGetHandler(t *testing.T) { // ----------------- // Test unauthenticated request with permissions off. - enforcePermissions = false + theConfig.RequireSignatures = false // Unauthenticated request, unsigned locator // => OK @@ -90,7 +93,7 @@ func TestGetHandler(t *testing.T) { // ---------------- // Permissions: on. - enforcePermissions = true + theConfig.RequireSignatures = true // Authenticated request, signed locator // => OK @@ -175,8 +178,8 @@ func TestPutHandler(t *testing.T) { // ------------------ // With a server key. - PermissionSecret = []byte(knownKey) - blobSignatureTTL = 300 * time.Second + theConfig.blobSigningKey = []byte(knownKey) + theConfig.BlobSignatureTTL.Set("5m") // When a permission key is available, the locator returned // from an authenticated PUT request will be signed. @@ -220,7 +223,7 @@ func TestPutHandler(t *testing.T) { func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) { defer teardown() - dataManagerToken = "fake-data-manager-token" + theConfig.systemAuthToken = "fake-data-manager-token" vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()} vols[0].Readonly = true KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]}) @@ -232,15 +235,15 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) { requestBody: TestBlock, }) defer func(orig bool) { - neverDelete = orig - }(neverDelete) - neverDelete = false + theConfig.EnableDelete = orig + }(theConfig.EnableDelete) + theConfig.EnableDelete = true IssueRequest( &RequestTester{ method: "DELETE", uri: "/" + TestHash, requestBody: TestBlock, - apiToken: dataManagerToken, + apiToken: theConfig.systemAuthToken, }) type expect struct { volnum int @@ -274,7 +277,7 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) { // - authenticated /index/prefix request | superuser // // The only /index requests that should succeed are those issued by the -// superuser. They should pass regardless of the value of enforcePermissions. +// superuser. They should pass regardless of the value of RequireSignatures. // func TestIndexHandler(t *testing.T) { defer teardown() @@ -286,12 +289,12 @@ func TestIndexHandler(t *testing.T) { defer KeepVM.Close() vols := KeepVM.AllWritable() - vols[0].Put(TestHash, TestBlock) - vols[1].Put(TestHash2, TestBlock2) - vols[0].Put(TestHash+".meta", []byte("metadata")) - vols[1].Put(TestHash2+".meta", []byte("metadata")) + vols[0].Put(context.Background(), TestHash, TestBlock) + vols[1].Put(context.Background(), TestHash2, TestBlock2) + vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata")) + vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata")) - dataManagerToken = "DATA MANAGER TOKEN" + theConfig.systemAuthToken = "DATA MANAGER TOKEN" unauthenticatedReq := &RequestTester{ method: "GET", @@ -305,7 +308,7 @@ func TestIndexHandler(t *testing.T) { superuserReq := &RequestTester{ method: "GET", uri: "/index", - apiToken: dataManagerToken, + apiToken: theConfig.systemAuthToken, } unauthPrefixReq := &RequestTester{ method: "GET", @@ -319,32 +322,32 @@ func TestIndexHandler(t *testing.T) { superuserPrefixReq := &RequestTester{ method: "GET", uri: "/index/" + TestHash[0:3], - apiToken: dataManagerToken, + apiToken: theConfig.systemAuthToken, } superuserNoSuchPrefixReq := &RequestTester{ method: "GET", uri: "/index/abcd", - apiToken: dataManagerToken, + apiToken: theConfig.systemAuthToken, } superuserInvalidPrefixReq := &RequestTester{ method: "GET", uri: "/index/xyz", - apiToken: dataManagerToken, + apiToken: theConfig.systemAuthToken, } // ------------------------------------------------------------- // Only the superuser should be allowed to issue /index requests. // --------------------------- - // enforcePermissions enabled + // RequireSignatures enabled // This setting should not affect tests passing. - enforcePermissions = true + theConfig.RequireSignatures = true // unauthenticated /index request // => UnauthorizedError response := IssueRequest(unauthenticatedReq) ExpectStatusCode(t, - "enforcePermissions on, unauthenticated request", + "RequireSignatures on, unauthenticated request", UnauthorizedError.HTTPCode, response) @@ -381,9 +384,9 @@ func TestIndexHandler(t *testing.T) { response) // ---------------------------- - // enforcePermissions disabled + // RequireSignatures disabled // Valid Request should still pass. - enforcePermissions = false + theConfig.RequireSignatures = false // superuser /index request // => OK @@ -475,17 +478,17 @@ func TestDeleteHandler(t *testing.T) { defer KeepVM.Close() vols := KeepVM.AllWritable() - vols[0].Put(TestHash, TestBlock) + vols[0].Put(context.Background(), TestHash, TestBlock) - // Explicitly set the blobSignatureTTL to 0 for these + // Explicitly set the BlobSignatureTTL to 0 for these // tests, to ensure the MockVolume deletes the blocks // even though they have just been created. - blobSignatureTTL = time.Duration(0) + theConfig.BlobSignatureTTL = arvados.Duration(0) var userToken = "NOT DATA MANAGER TOKEN" - dataManagerToken = "DATA MANAGER TOKEN" + theConfig.systemAuthToken = "DATA MANAGER TOKEN" - neverDelete = false + theConfig.EnableDelete = true unauthReq := &RequestTester{ method: "DELETE", @@ -501,13 +504,13 @@ func TestDeleteHandler(t *testing.T) { superuserExistingBlockReq := &RequestTester{ method: "DELETE", uri: "/" + TestHash, - apiToken: dataManagerToken, + apiToken: theConfig.systemAuthToken, } superuserNonexistentBlockReq := &RequestTester{ method: "DELETE", uri: "/" + TestHash2, - apiToken: dataManagerToken, + apiToken: theConfig.systemAuthToken, } // Unauthenticated request returns PermissionError. @@ -538,14 +541,14 @@ func TestDeleteHandler(t *testing.T) { http.StatusNotFound, response) - // Authenticated admin request for existing block while neverDelete is set. - neverDelete = true + // Authenticated admin request for existing block while EnableDelete is false. + theConfig.EnableDelete = false response = IssueRequest(superuserExistingBlockReq) ExpectStatusCode(t, "authenticated request, existing block, method disabled", MethodDisabledError.HTTPCode, response) - neverDelete = false + theConfig.EnableDelete = true // Authenticated admin request for existing block. response = IssueRequest(superuserExistingBlockReq) @@ -561,16 +564,17 @@ func TestDeleteHandler(t *testing.T) { expectedDc, responseDc) } // Confirm the block has been deleted - _, err := vols[0].Get(TestHash) + buf := make([]byte, BlockSize) + _, err := vols[0].Get(context.Background(), TestHash, buf) var blockDeleted = os.IsNotExist(err) if !blockDeleted { t.Error("superuserExistingBlockReq: block not deleted") } - // A DELETE request on a block newer than blobSignatureTTL + // A DELETE request on a block newer than BlobSignatureTTL // should return success but leave the block on the volume. - vols[0].Put(TestHash, TestBlock) - blobSignatureTTL = time.Hour + vols[0].Put(context.Background(), TestHash, TestBlock) + theConfig.BlobSignatureTTL = arvados.Duration(time.Hour) response = IssueRequest(superuserExistingBlockReq) ExpectStatusCode(t, @@ -585,7 +589,7 @@ func TestDeleteHandler(t *testing.T) { expectedDc, responseDc) } // Confirm the block has NOT been deleted. - _, err = vols[0].Get(TestHash) + _, err = vols[0].Get(context.Background(), TestHash, buf) if err != nil { t.Errorf("testing delete on new block: %s\n", err) } @@ -622,7 +626,7 @@ func TestPullHandler(t *testing.T) { defer teardown() var userToken = "USER TOKEN" - dataManagerToken = "DATA MANAGER TOKEN" + theConfig.systemAuthToken = "DATA MANAGER TOKEN" pullq = NewWorkQueue() @@ -667,13 +671,13 @@ func TestPullHandler(t *testing.T) { }, { "Valid pull request from the data manager", - RequestTester{"/pull", dataManagerToken, "PUT", goodJSON}, + RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON}, http.StatusOK, "Received 3 pull requests\n", }, { "Invalid pull request from the data manager", - RequestTester{"/pull", dataManagerToken, "PUT", badJSON}, + RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON}, http.StatusBadRequest, "", }, @@ -728,7 +732,7 @@ func TestTrashHandler(t *testing.T) { defer teardown() var userToken = "USER TOKEN" - dataManagerToken = "DATA MANAGER TOKEN" + theConfig.systemAuthToken = "DATA MANAGER TOKEN" trashq = NewWorkQueue() @@ -771,13 +775,13 @@ func TestTrashHandler(t *testing.T) { }, { "Valid trash list from the data manager", - RequestTester{"/trash", dataManagerToken, "PUT", goodJSON}, + RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON}, http.StatusOK, "Received 3 trash requests\n", }, { "Invalid trash list from the data manager", - RequestTester{"/trash", dataManagerToken, "PUT", badJSON}, + RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON}, http.StatusBadRequest, "", }, @@ -814,7 +818,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder { if rt.apiToken != "" { req.Header.Set("Authorization", "OAuth2 "+rt.apiToken) } - loggingRouter := MakeLoggingRESTRouter() + loggingRouter := MakeRESTRouter() loggingRouter.ServeHTTP(response, req) return response } @@ -872,7 +876,7 @@ func TestPutNeedsOnlyOneBuffer(t *testing.T) { select { case <-ok: case <-time.After(time.Second): - t.Fatal("PUT deadlocks with maxBuffers==1") + t.Fatal("PUT deadlocks with MaxBuffers==1") } } @@ -887,7 +891,7 @@ func TestPutHandlerNoBufferleak(t *testing.T) { ok := make(chan bool) go func() { - for i := 0; i < maxBuffers+1; i++ { + for i := 0; i < theConfig.MaxBuffers+1; i++ { // Unauthenticated request, no server key // => OK (unsigned response) unsignedLocator := "/" + TestHash @@ -913,9 +917,68 @@ func TestPutHandlerNoBufferleak(t *testing.T) { } } +type notifyingResponseRecorder struct { + *httptest.ResponseRecorder + closer chan bool +} + +func (r *notifyingResponseRecorder) CloseNotify() <-chan bool { + return r.closer +} + +func TestGetHandlerClientDisconnect(t *testing.T) { + defer func(was bool) { + theConfig.RequireSignatures = was + }(theConfig.RequireSignatures) + theConfig.RequireSignatures = false + + defer func(orig *bufferPool) { + bufs = orig + }(bufs) + bufs = newBufferPool(1, BlockSize) + defer bufs.Put(bufs.Get(BlockSize)) + + KeepVM = MakeTestVolumeManager(2) + defer KeepVM.Close() + + if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil { + t.Error(err) + } + + resp := ¬ifyingResponseRecorder{ + ResponseRecorder: httptest.NewRecorder(), + closer: make(chan bool, 1), + } + if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok { + t.Fatal("notifyingResponseRecorder is broken") + } + // If anyone asks, the client has disconnected. + resp.closer <- true + + ok := make(chan struct{}) + go func() { + req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil) + (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req) + ok <- struct{}{} + }() + + select { + case <-time.After(20 * time.Second): + t.Fatal("request took >20s, close notifier must be broken") + case <-ok: + } + + ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder) + for i, v := range KeepVM.AllWritable() { + if calls := v.(*MockVolume).called["GET"]; calls != 0 { + t.Errorf("volume %d got %d calls, expected 0", i, calls) + } + } +} + // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource // leak. -func TestGetHandlerNoBufferleak(t *testing.T) { +func TestGetHandlerNoBufferLeak(t *testing.T) { defer teardown() // Prepare two test Keep volumes. Our block is stored on the second volume. @@ -923,13 +986,13 @@ func TestGetHandlerNoBufferleak(t *testing.T) { defer KeepVM.Close() vols := KeepVM.AllWritable() - if err := vols[0].Put(TestHash, TestBlock); err != nil { + if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil { t.Error(err) } ok := make(chan bool) go func() { - for i := 0; i < maxBuffers+1; i++ { + for i := 0; i < theConfig.MaxBuffers+1; i++ { // Unauthenticated request, unsigned locator // => OK unsignedLocator := "/" + TestHash @@ -970,3 +1033,106 @@ func TestPutReplicationHeader(t *testing.T) { t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1") } } + +func TestUntrashHandler(t *testing.T) { + defer teardown() + + // Set up Keep volumes + KeepVM = MakeTestVolumeManager(2) + defer KeepVM.Close() + vols := KeepVM.AllWritable() + vols[0].Put(context.Background(), TestHash, TestBlock) + + theConfig.systemAuthToken = "DATA MANAGER TOKEN" + + // unauthenticatedReq => UnauthorizedError + unauthenticatedReq := &RequestTester{ + method: "PUT", + uri: "/untrash/" + TestHash, + } + response := IssueRequest(unauthenticatedReq) + ExpectStatusCode(t, + "Unauthenticated request", + UnauthorizedError.HTTPCode, + response) + + // notDataManagerReq => UnauthorizedError + notDataManagerReq := &RequestTester{ + method: "PUT", + uri: "/untrash/" + TestHash, + apiToken: knownToken, + } + + response = IssueRequest(notDataManagerReq) + ExpectStatusCode(t, + "Non-datamanager token", + UnauthorizedError.HTTPCode, + response) + + // datamanagerWithBadHashReq => StatusBadRequest + datamanagerWithBadHashReq := &RequestTester{ + method: "PUT", + uri: "/untrash/thisisnotalocator", + apiToken: theConfig.systemAuthToken, + } + response = IssueRequest(datamanagerWithBadHashReq) + ExpectStatusCode(t, + "Bad locator in untrash request", + http.StatusBadRequest, + response) + + // datamanagerWrongMethodReq => StatusBadRequest + datamanagerWrongMethodReq := &RequestTester{ + method: "GET", + uri: "/untrash/" + TestHash, + apiToken: theConfig.systemAuthToken, + } + response = IssueRequest(datamanagerWrongMethodReq) + ExpectStatusCode(t, + "Only PUT method is supported for untrash", + http.StatusBadRequest, + response) + + // datamanagerReq => StatusOK + datamanagerReq := &RequestTester{ + method: "PUT", + uri: "/untrash/" + TestHash, + apiToken: theConfig.systemAuthToken, + } + response = IssueRequest(datamanagerReq) + ExpectStatusCode(t, + "", + http.StatusOK, + response) + expected := "Successfully untrashed on: [MockVolume],[MockVolume]" + if response.Body.String() != expected { + t.Errorf( + "Untrash response mismatched: expected %s, got:\n%s", + expected, response.Body.String()) + } +} + +func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) { + defer teardown() + + // Set up readonly Keep volumes + vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()} + vols[0].Readonly = true + vols[1].Readonly = true + KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]}) + defer KeepVM.Close() + + theConfig.systemAuthToken = "DATA MANAGER TOKEN" + + // datamanagerReq => StatusOK + datamanagerReq := &RequestTester{ + method: "PUT", + uri: "/untrash/" + TestHash, + apiToken: theConfig.systemAuthToken, + } + response := IssueRequest(datamanagerReq) + ExpectStatusCode(t, + "No writable volumes", + http.StatusNotFound, + response) +}