X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fd50115d7e01a595f65e47dd95b129362d18e975..f04693da1811e670d4cbb981debeecf14d79137c:/services/keepstore/handler_test.go diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go index 7429d7ad41..251ad0a1df 100644 --- a/services/keepstore/handler_test.go +++ b/services/keepstore/handler_test.go @@ -23,13 +23,51 @@ import ( "os" "regexp" "strings" - "testing" "time" + "git.curoverse.com/arvados.git/lib/config" "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadostest" + "git.curoverse.com/arvados.git/sdk/go/ctxlog" + "github.com/prometheus/client_golang/prometheus" + check "gopkg.in/check.v1" ) +var testServiceURL = func() arvados.URL { + return arvados.URL{Host: "localhost:12345", Scheme: "http"} +}() + +func testCluster(t TB) *arvados.Cluster { + cfg, err := config.NewLoader(bytes.NewBufferString("Clusters: {zzzzz: {}}"), ctxlog.TestLogger(t)).Load() + if err != nil { + t.Fatal(err) + } + cluster, err := cfg.GetCluster("") + if err != nil { + t.Fatal(err) + } + cluster.SystemRootToken = arvadostest.DataManagerToken + cluster.ManagementToken = arvadostest.ManagementToken + cluster.Collections.BlobSigning = false + return cluster +} + +var _ = check.Suite(&HandlerSuite{}) + +type HandlerSuite struct { + cluster *arvados.Cluster + handler *handler +} + +func (s *HandlerSuite) SetUpTest(c *check.C) { + s.cluster = testCluster(c) + s.cluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "mock"}, + "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "mock"}, + } + s.handler = &handler{} +} + // A RequestTester represents the parameters for an HTTP request to // be issued on behalf of a unit test. type RequestTester struct { @@ -45,47 +83,43 @@ type RequestTester struct { // - permissions on, authenticated request, unsigned locator // - permissions on, unauthenticated request, signed locator // - permissions on, authenticated request, expired locator +// - permissions on, authenticated request, signed locator, transient error from backend // -func TestGetHandler(t *testing.T) { - defer teardown() - - // Prepare two test Keep volumes. Our block is stored on the second volume. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() +func (s *HandlerSuite) TestGetHandler(c *check.C) { + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) - vols := KeepVM.AllWritable() - if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil { - t.Error(err) - } + vols := s.handler.volmgr.AllWritable() + err := vols[0].Put(context.Background(), TestHash, TestBlock) + c.Check(err, check.IsNil) // Create locators for testing. // Turn on permission settings so we can generate signed locators. - theConfig.RequireSignatures = true - theConfig.blobSigningKey = []byte(knownKey) - theConfig.BlobSignatureTTL.Set("5m") + s.cluster.Collections.BlobSigning = true + s.cluster.Collections.BlobSigningKey = knownKey + s.cluster.Collections.BlobSigningTTL.Set("5m") var ( unsignedLocator = "/" + TestHash - validTimestamp = time.Now().Add(theConfig.BlobSignatureTTL.Duration()) + validTimestamp = time.Now().Add(s.cluster.Collections.BlobSigningTTL.Duration()) expiredTimestamp = time.Now().Add(-time.Hour) - signedLocator = "/" + SignLocator(TestHash, knownToken, validTimestamp) - expiredLocator = "/" + SignLocator(TestHash, knownToken, expiredTimestamp) + signedLocator = "/" + SignLocator(s.cluster, TestHash, knownToken, validTimestamp) + expiredLocator = "/" + SignLocator(s.cluster, TestHash, knownToken, expiredTimestamp) ) // ----------------- // Test unauthenticated request with permissions off. - theConfig.RequireSignatures = false + s.cluster.Collections.BlobSigning = false // Unauthenticated request, unsigned locator // => OK - response := IssueRequest( + response := IssueRequest(s.handler, &RequestTester{ method: "GET", uri: unsignedLocator, }) - ExpectStatusCode(t, + ExpectStatusCode(c, "Unauthenticated request, unsigned locator", http.StatusOK, response) - ExpectBody(t, + ExpectBody(c, "Unauthenticated request, unsigned locator", string(TestBlock), response) @@ -93,60 +127,77 @@ func TestGetHandler(t *testing.T) { receivedLen := response.Header().Get("Content-Length") expectedLen := fmt.Sprintf("%d", len(TestBlock)) if receivedLen != expectedLen { - t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen) + c.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen) } // ---------------- // Permissions: on. - theConfig.RequireSignatures = true + s.cluster.Collections.BlobSigning = true // Authenticated request, signed locator // => OK - response = IssueRequest(&RequestTester{ + response = IssueRequest(s.handler, &RequestTester{ method: "GET", uri: signedLocator, apiToken: knownToken, }) - ExpectStatusCode(t, + ExpectStatusCode(c, "Authenticated request, signed locator", http.StatusOK, response) - ExpectBody(t, + ExpectBody(c, "Authenticated request, signed locator", string(TestBlock), response) receivedLen = response.Header().Get("Content-Length") expectedLen = fmt.Sprintf("%d", len(TestBlock)) if receivedLen != expectedLen { - t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen) + c.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen) } // Authenticated request, unsigned locator // => PermissionError - response = IssueRequest(&RequestTester{ + response = IssueRequest(s.handler, &RequestTester{ method: "GET", uri: unsignedLocator, apiToken: knownToken, }) - ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response) + ExpectStatusCode(c, "unsigned locator", PermissionError.HTTPCode, response) // Unauthenticated request, signed locator // => PermissionError - response = IssueRequest(&RequestTester{ + response = IssueRequest(s.handler, &RequestTester{ method: "GET", uri: signedLocator, }) - ExpectStatusCode(t, + ExpectStatusCode(c, "Unauthenticated request, signed locator", PermissionError.HTTPCode, response) // Authenticated request, expired locator // => ExpiredError - response = IssueRequest(&RequestTester{ + response = IssueRequest(s.handler, &RequestTester{ method: "GET", uri: expiredLocator, apiToken: knownToken, }) - ExpectStatusCode(t, + ExpectStatusCode(c, "Authenticated request, expired locator", ExpiredError.HTTPCode, response) + + // Authenticated request, signed locator + // => 503 Server busy (transient error) + + // Set up the block owning volume to respond with errors + vols[0].Volume.(*MockVolume).Bad = true + vols[0].Volume.(*MockVolume).BadVolumeError = VolumeBusyError + response = IssueRequest(s.handler, &RequestTester{ + method: "GET", + uri: signedLocator, + apiToken: knownToken, + }) + // A transient error from one volume while the other doesn't find the block + // should make the service return a 503 so that clients can retry. + ExpectStatusCode(c, + "Volume backend busy", + 503, response) } // Test PutBlockHandler on the following situations: @@ -154,44 +205,42 @@ func TestGetHandler(t *testing.T) { // - with server key, authenticated request, unsigned locator // - with server key, unauthenticated request, unsigned locator // -func TestPutHandler(t *testing.T) { - defer teardown() - - // Prepare two test Keep volumes. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() +func (s *HandlerSuite) TestPutHandler(c *check.C) { + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) // -------------- // No server key. + s.cluster.Collections.BlobSigningKey = "" + // Unauthenticated request, no server key // => OK (unsigned response) unsignedLocator := "/" + TestHash - response := IssueRequest( + response := IssueRequest(s.handler, &RequestTester{ method: "PUT", uri: unsignedLocator, requestBody: TestBlock, }) - ExpectStatusCode(t, + ExpectStatusCode(c, "Unauthenticated request, no server key", http.StatusOK, response) - ExpectBody(t, + ExpectBody(c, "Unauthenticated request, no server key", TestHashPutResp, response) // ------------------ // With a server key. - theConfig.blobSigningKey = []byte(knownKey) - theConfig.BlobSignatureTTL.Set("5m") + s.cluster.Collections.BlobSigningKey = knownKey + s.cluster.Collections.BlobSigningTTL.Set("5m") // When a permission key is available, the locator returned // from an authenticated PUT request will be signed. // Authenticated PUT, signed locator // => OK (signed response) - response = IssueRequest( + response = IssueRequest(s.handler, &RequestTester{ method: "PUT", uri: unsignedLocator, @@ -199,76 +248,72 @@ func TestPutHandler(t *testing.T) { apiToken: knownToken, }) - ExpectStatusCode(t, + ExpectStatusCode(c, "Authenticated PUT, signed locator, with server key", http.StatusOK, response) responseLocator := strings.TrimSpace(response.Body.String()) - if VerifySignature(responseLocator, knownToken) != nil { - t.Errorf("Authenticated PUT, signed locator, with server key:\n"+ + if VerifySignature(s.cluster, responseLocator, knownToken) != nil { + c.Errorf("Authenticated PUT, signed locator, with server key:\n"+ "response '%s' does not contain a valid signature", responseLocator) } // Unauthenticated PUT, unsigned locator // => OK - response = IssueRequest( + response = IssueRequest(s.handler, &RequestTester{ method: "PUT", uri: unsignedLocator, requestBody: TestBlock, }) - ExpectStatusCode(t, + ExpectStatusCode(c, "Unauthenticated PUT, unsigned locator, with server key", http.StatusOK, response) - ExpectBody(t, + ExpectBody(c, "Unauthenticated PUT, unsigned locator, with server key", TestHashPutResp, response) } -func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) { - defer teardown() - theConfig.systemAuthToken = "fake-data-manager-token" - vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()} - vols[0].Readonly = true - KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]}) - defer KeepVM.Close() - IssueRequest( +func (s *HandlerSuite) TestPutAndDeleteSkipReadonlyVolumes(c *check.C) { + s.cluster.Volumes["zzzzz-nyw5e-000000000000000"] = arvados.Volume{Driver: "mock", ReadOnly: true} + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) + + s.cluster.SystemRootToken = "fake-data-manager-token" + IssueRequest(s.handler, &RequestTester{ method: "PUT", uri: "/" + TestHash, requestBody: TestBlock, }) - defer func(orig bool) { - theConfig.EnableDelete = orig - }(theConfig.EnableDelete) - theConfig.EnableDelete = true - IssueRequest( + + s.cluster.Collections.BlobTrash = true + IssueRequest(s.handler, &RequestTester{ method: "DELETE", uri: "/" + TestHash, requestBody: TestBlock, - apiToken: theConfig.systemAuthToken, + apiToken: s.cluster.SystemRootToken, }) type expect struct { - volnum int + volid string method string callcount int } for _, e := range []expect{ - {0, "Get", 0}, - {0, "Compare", 0}, - {0, "Touch", 0}, - {0, "Put", 0}, - {0, "Delete", 0}, - {1, "Get", 0}, - {1, "Compare", 1}, - {1, "Touch", 1}, - {1, "Put", 1}, - {1, "Delete", 1}, + {"zzzzz-nyw5e-000000000000000", "Get", 0}, + {"zzzzz-nyw5e-000000000000000", "Compare", 0}, + {"zzzzz-nyw5e-000000000000000", "Touch", 0}, + {"zzzzz-nyw5e-000000000000000", "Put", 0}, + {"zzzzz-nyw5e-000000000000000", "Delete", 0}, + {"zzzzz-nyw5e-111111111111111", "Get", 0}, + {"zzzzz-nyw5e-111111111111111", "Compare", 1}, + {"zzzzz-nyw5e-111111111111111", "Touch", 1}, + {"zzzzz-nyw5e-111111111111111", "Put", 1}, + {"zzzzz-nyw5e-111111111111111", "Delete", 1}, } { - if calls := vols[e.volnum].CallCount(e.method); calls != e.callcount { - t.Errorf("Got %d %s() on vol %d, expect %d", calls, e.method, e.volnum, e.callcount) + if calls := s.handler.volmgr.mountMap[e.volid].Volume.(*MockVolume).CallCount(e.method); calls != e.callcount { + c.Errorf("Got %d %s() on vol %s, expect %d", calls, e.method, e.volid, e.callcount) } } } @@ -284,22 +329,18 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) { // The only /index requests that should succeed are those issued by the // superuser. They should pass regardless of the value of RequireSignatures. // -func TestIndexHandler(t *testing.T) { - defer teardown() +func (s *HandlerSuite) TestIndexHandler(c *check.C) { + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) - // Set up Keep volumes and populate them. // Include multiple blocks on different volumes, and // some metadata files (which should be omitted from index listings) - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() - - vols := KeepVM.AllWritable() + vols := s.handler.volmgr.AllWritable() vols[0].Put(context.Background(), TestHash, TestBlock) vols[1].Put(context.Background(), TestHash2, TestBlock2) vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata")) vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata")) - theConfig.systemAuthToken = "DATA MANAGER TOKEN" + s.cluster.SystemRootToken = "DATA MANAGER TOKEN" unauthenticatedReq := &RequestTester{ method: "GET", @@ -313,7 +354,7 @@ func TestIndexHandler(t *testing.T) { superuserReq := &RequestTester{ method: "GET", uri: "/index", - apiToken: theConfig.systemAuthToken, + apiToken: s.cluster.SystemRootToken, } unauthPrefixReq := &RequestTester{ method: "GET", @@ -327,76 +368,76 @@ func TestIndexHandler(t *testing.T) { superuserPrefixReq := &RequestTester{ method: "GET", uri: "/index/" + TestHash[0:3], - apiToken: theConfig.systemAuthToken, + apiToken: s.cluster.SystemRootToken, } superuserNoSuchPrefixReq := &RequestTester{ method: "GET", uri: "/index/abcd", - apiToken: theConfig.systemAuthToken, + apiToken: s.cluster.SystemRootToken, } superuserInvalidPrefixReq := &RequestTester{ method: "GET", uri: "/index/xyz", - apiToken: theConfig.systemAuthToken, + apiToken: s.cluster.SystemRootToken, } // ------------------------------------------------------------- // Only the superuser should be allowed to issue /index requests. // --------------------------- - // RequireSignatures enabled + // BlobSigning enabled // This setting should not affect tests passing. - theConfig.RequireSignatures = true + s.cluster.Collections.BlobSigning = true // unauthenticated /index request // => UnauthorizedError - response := IssueRequest(unauthenticatedReq) - ExpectStatusCode(t, + response := IssueRequest(s.handler, unauthenticatedReq) + ExpectStatusCode(c, "RequireSignatures on, unauthenticated request", UnauthorizedError.HTTPCode, response) // unauthenticated /index/prefix request // => UnauthorizedError - response = IssueRequest(unauthPrefixReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, unauthPrefixReq) + ExpectStatusCode(c, "permissions on, unauthenticated /index/prefix request", UnauthorizedError.HTTPCode, response) // authenticated /index request, non-superuser // => UnauthorizedError - response = IssueRequest(authenticatedReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, authenticatedReq) + ExpectStatusCode(c, "permissions on, authenticated request, non-superuser", UnauthorizedError.HTTPCode, response) // authenticated /index/prefix request, non-superuser // => UnauthorizedError - response = IssueRequest(authPrefixReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, authPrefixReq) + ExpectStatusCode(c, "permissions on, authenticated /index/prefix request, non-superuser", UnauthorizedError.HTTPCode, response) // superuser /index request // => OK - response = IssueRequest(superuserReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, superuserReq) + ExpectStatusCode(c, "permissions on, superuser request", http.StatusOK, response) // ---------------------------- - // RequireSignatures disabled + // BlobSigning disabled // Valid Request should still pass. - theConfig.RequireSignatures = false + s.cluster.Collections.BlobSigning = false // superuser /index request // => OK - response = IssueRequest(superuserReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, superuserReq) + ExpectStatusCode(c, "permissions on, superuser request", http.StatusOK, response) @@ -405,15 +446,15 @@ func TestIndexHandler(t *testing.T) { TestHash2 + `\+\d+ \d+\n\n$` match, _ := regexp.MatchString(expected, response.Body.String()) if !match { - t.Errorf( + c.Errorf( "permissions on, superuser request: expected %s, got:\n%s", expected, response.Body.String()) } // superuser /index/prefix request // => OK - response = IssueRequest(superuserPrefixReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, superuserPrefixReq) + ExpectStatusCode(c, "permissions on, superuser request", http.StatusOK, response) @@ -421,27 +462,27 @@ func TestIndexHandler(t *testing.T) { expected = `^` + TestHash + `\+\d+ \d+\n\n$` match, _ = regexp.MatchString(expected, response.Body.String()) if !match { - t.Errorf( + c.Errorf( "permissions on, superuser /index/prefix request: expected %s, got:\n%s", expected, response.Body.String()) } // superuser /index/{no-such-prefix} request // => OK - response = IssueRequest(superuserNoSuchPrefixReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, superuserNoSuchPrefixReq) + ExpectStatusCode(c, "permissions on, superuser request", http.StatusOK, response) if "\n" != response.Body.String() { - t.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String()) + c.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String()) } // superuser /index/{invalid-prefix} request // => StatusBadRequest - response = IssueRequest(superuserInvalidPrefixReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, superuserInvalidPrefixReq) + ExpectStatusCode(c, "permissions on, superuser request", http.StatusBadRequest, response) @@ -473,27 +514,21 @@ func TestIndexHandler(t *testing.T) { // (test for 200 OK, response with copies_deleted=0, copies_failed=1, // confirm block not deleted) // -func TestDeleteHandler(t *testing.T) { - defer teardown() +func (s *HandlerSuite) TestDeleteHandler(c *check.C) { + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) - // Set up Keep volumes and populate them. - // Include multiple blocks on different volumes, and - // some metadata files (which should be omitted from index listings) - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() - - vols := KeepVM.AllWritable() + vols := s.handler.volmgr.AllWritable() vols[0].Put(context.Background(), TestHash, TestBlock) // Explicitly set the BlobSignatureTTL to 0 for these // tests, to ensure the MockVolume deletes the blocks // even though they have just been created. - theConfig.BlobSignatureTTL = arvados.Duration(0) + s.cluster.Collections.BlobSigningTTL = arvados.Duration(0) var userToken = "NOT DATA MANAGER TOKEN" - theConfig.systemAuthToken = "DATA MANAGER TOKEN" + s.cluster.SystemRootToken = "DATA MANAGER TOKEN" - theConfig.EnableDelete = true + s.cluster.Collections.BlobTrash = true unauthReq := &RequestTester{ method: "DELETE", @@ -509,26 +544,26 @@ func TestDeleteHandler(t *testing.T) { superuserExistingBlockReq := &RequestTester{ method: "DELETE", uri: "/" + TestHash, - apiToken: theConfig.systemAuthToken, + apiToken: s.cluster.SystemRootToken, } superuserNonexistentBlockReq := &RequestTester{ method: "DELETE", uri: "/" + TestHash2, - apiToken: theConfig.systemAuthToken, + apiToken: s.cluster.SystemRootToken, } // Unauthenticated request returns PermissionError. var response *httptest.ResponseRecorder - response = IssueRequest(unauthReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, unauthReq) + ExpectStatusCode(c, "unauthenticated request", PermissionError.HTTPCode, response) // Authenticated non-admin request returns PermissionError. - response = IssueRequest(userReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, userReq) + ExpectStatusCode(c, "authenticated non-admin request", PermissionError.HTTPCode, response) @@ -540,24 +575,24 @@ func TestDeleteHandler(t *testing.T) { } var responseDc, expectedDc deletecounter - response = IssueRequest(superuserNonexistentBlockReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, superuserNonexistentBlockReq) + ExpectStatusCode(c, "data manager request, nonexistent block", http.StatusNotFound, response) - // Authenticated admin request for existing block while EnableDelete is false. - theConfig.EnableDelete = false - response = IssueRequest(superuserExistingBlockReq) - ExpectStatusCode(t, + // Authenticated admin request for existing block while BlobTrash is false. + s.cluster.Collections.BlobTrash = false + response = IssueRequest(s.handler, superuserExistingBlockReq) + ExpectStatusCode(c, "authenticated request, existing block, method disabled", MethodDisabledError.HTTPCode, response) - theConfig.EnableDelete = true + s.cluster.Collections.BlobTrash = true // Authenticated admin request for existing block. - response = IssueRequest(superuserExistingBlockReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, superuserExistingBlockReq) + ExpectStatusCode(c, "data manager request, existing block", http.StatusOK, response) @@ -565,7 +600,7 @@ func TestDeleteHandler(t *testing.T) { expectedDc = deletecounter{1, 0} json.NewDecoder(response.Body).Decode(&responseDc) if responseDc != expectedDc { - t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v", + c.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v", expectedDc, responseDc) } // Confirm the block has been deleted @@ -573,16 +608,16 @@ func TestDeleteHandler(t *testing.T) { _, err := vols[0].Get(context.Background(), TestHash, buf) var blockDeleted = os.IsNotExist(err) if !blockDeleted { - t.Error("superuserExistingBlockReq: block not deleted") + c.Error("superuserExistingBlockReq: block not deleted") } // A DELETE request on a block newer than BlobSignatureTTL // should return success but leave the block on the volume. vols[0].Put(context.Background(), TestHash, TestBlock) - theConfig.BlobSignatureTTL = arvados.Duration(time.Hour) + s.cluster.Collections.BlobSigningTTL = arvados.Duration(time.Hour) - response = IssueRequest(superuserExistingBlockReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, superuserExistingBlockReq) + ExpectStatusCode(c, "data manager request, existing block", http.StatusOK, response) @@ -590,13 +625,13 @@ func TestDeleteHandler(t *testing.T) { expectedDc = deletecounter{1, 0} json.NewDecoder(response.Body).Decode(&responseDc) if responseDc != expectedDc { - t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v", + c.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v", expectedDc, responseDc) } // Confirm the block has NOT been deleted. _, err = vols[0].Get(context.Background(), TestHash, buf) if err != nil { - t.Errorf("testing delete on new block: %s\n", err) + c.Errorf("testing delete on new block: %s\n", err) } } @@ -627,29 +662,33 @@ func TestDeleteHandler(t *testing.T) { // pull list simultaneously. Make sure that none of them return 400 // Bad Request and that pullq.GetList() returns a valid list. // -func TestPullHandler(t *testing.T) { - defer teardown() +func (s *HandlerSuite) TestPullHandler(c *check.C) { + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) - var userToken = "USER TOKEN" - theConfig.systemAuthToken = "DATA MANAGER TOKEN" + // Replace the router's pullq -- which the worker goroutines + // started by setup() are now receiving from -- with a new + // one, so we can see what the handler sends to it. + pullq := NewWorkQueue() + s.handler.Handler.(*router).pullq = pullq - pullq = NewWorkQueue() + var userToken = "USER TOKEN" + s.cluster.SystemRootToken = "DATA MANAGER TOKEN" goodJSON := []byte(`[ { - "locator":"locator_with_two_servers", + "locator":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+12345", "servers":[ - "server1", - "server2" + "http://server1", + "http://server2" ] }, { - "locator":"locator_with_no_servers", + "locator":"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+12345", "servers":[] }, { - "locator":"", - "servers":["empty_locator"] + "locator":"cccccccccccccccccccccccccccccccc+12345", + "servers":["http://server1"] } ]`) @@ -676,34 +715,39 @@ func TestPullHandler(t *testing.T) { }, { "Valid pull request from the data manager", - RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON}, + RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", goodJSON}, http.StatusOK, "Received 3 pull requests\n", }, { "Invalid pull request from the data manager", - RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON}, + RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", badJSON}, http.StatusBadRequest, "", }, } for _, tst := range testcases { - response := IssueRequest(&tst.req) - ExpectStatusCode(t, tst.name, tst.responseCode, response) - ExpectBody(t, tst.name, tst.responseBody, response) + response := IssueRequest(s.handler, &tst.req) + ExpectStatusCode(c, tst.name, tst.responseCode, response) + ExpectBody(c, tst.name, tst.responseBody, response) } // The Keep pull manager should have received one good list with 3 // requests on it. for i := 0; i < 3; i++ { - item := <-pullq.NextItem + var item interface{} + select { + case item = <-pullq.NextItem: + case <-time.After(time.Second): + c.Error("timed out") + } if _, ok := item.(PullRequest); !ok { - t.Errorf("item %v could not be parsed as a PullRequest", item) + c.Errorf("item %v could not be parsed as a PullRequest", item) } } - expectChannelEmpty(t, pullq.NextItem) + expectChannelEmpty(c, pullq.NextItem) } // TestTrashHandler @@ -733,13 +777,16 @@ func TestPullHandler(t *testing.T) { // pull list simultaneously. Make sure that none of them return 400 // Bad Request and that replica.Dump() returns a valid list. // -func TestTrashHandler(t *testing.T) { - defer teardown() +func (s *HandlerSuite) TestTrashHandler(c *check.C) { + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) + // Replace the router's trashq -- which the worker goroutines + // started by setup() are now receiving from -- with a new + // one, so we can see what the handler sends to it. + trashq := NewWorkQueue() + s.handler.Handler.(*router).trashq = trashq var userToken = "USER TOKEN" - theConfig.systemAuthToken = "DATA MANAGER TOKEN" - - trashq = NewWorkQueue() + s.cluster.SystemRootToken = "DATA MANAGER TOKEN" goodJSON := []byte(`[ { @@ -780,22 +827,22 @@ func TestTrashHandler(t *testing.T) { }, { "Valid trash list from the data manager", - RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON}, + RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", goodJSON}, http.StatusOK, "Received 3 trash requests\n", }, { "Invalid trash list from the data manager", - RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON}, + RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", badJSON}, http.StatusBadRequest, "", }, } for _, tst := range testcases { - response := IssueRequest(&tst.req) - ExpectStatusCode(t, tst.name, tst.responseCode, response) - ExpectBody(t, tst.name, tst.responseBody, response) + response := IssueRequest(s.handler, &tst.req) + ExpectStatusCode(c, tst.name, tst.responseCode, response) + ExpectBody(c, tst.name, tst.responseBody, response) } // The trash collector should have received one good list with 3 @@ -803,11 +850,11 @@ func TestTrashHandler(t *testing.T) { for i := 0; i < 3; i++ { item := <-trashq.NextItem if _, ok := item.(TrashRequest); !ok { - t.Errorf("item %v could not be parsed as a TrashRequest", item) + c.Errorf("item %v could not be parsed as a TrashRequest", item) } } - expectChannelEmpty(t, trashq.NextItem) + expectChannelEmpty(c, trashq.NextItem) } // ==================== @@ -816,75 +863,71 @@ func TestTrashHandler(t *testing.T) { // IssueTestRequest executes an HTTP request described by rt, to a // REST router. It returns the HTTP response to the request. -func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder { +func IssueRequest(handler http.Handler, rt *RequestTester) *httptest.ResponseRecorder { response := httptest.NewRecorder() body := bytes.NewReader(rt.requestBody) req, _ := http.NewRequest(rt.method, rt.uri, body) if rt.apiToken != "" { req.Header.Set("Authorization", "OAuth2 "+rt.apiToken) } - loggingRouter := MakeRESTRouter() - loggingRouter.ServeHTTP(response, req) + handler.ServeHTTP(response, req) return response } -func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder { +func IssueHealthCheckRequest(handler http.Handler, rt *RequestTester) *httptest.ResponseRecorder { response := httptest.NewRecorder() body := bytes.NewReader(rt.requestBody) req, _ := http.NewRequest(rt.method, rt.uri, body) if rt.apiToken != "" { req.Header.Set("Authorization", "Bearer "+rt.apiToken) } - loggingRouter := MakeRESTRouter() - loggingRouter.ServeHTTP(response, req) + handler.ServeHTTP(response, req) return response } // ExpectStatusCode checks whether a response has the specified status code, // and reports a test failure if not. func ExpectStatusCode( - t *testing.T, + c *check.C, testname string, expectedStatus int, response *httptest.ResponseRecorder) { if response.Code != expectedStatus { - t.Errorf("%s: expected status %d, got %+v", + c.Errorf("%s: expected status %d, got %+v", testname, expectedStatus, response) } } func ExpectBody( - t *testing.T, + c *check.C, testname string, expectedBody string, response *httptest.ResponseRecorder) { if expectedBody != "" && response.Body.String() != expectedBody { - t.Errorf("%s: expected response body '%s', got %+v", + c.Errorf("%s: expected response body '%s', got %+v", testname, expectedBody, response) } } // See #7121 -func TestPutNeedsOnlyOneBuffer(t *testing.T) { - defer teardown() - KeepVM = MakeTestVolumeManager(1) - defer KeepVM.Close() +func (s *HandlerSuite) TestPutNeedsOnlyOneBuffer(c *check.C) { + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) defer func(orig *bufferPool) { bufs = orig }(bufs) - bufs = newBufferPool(1, BlockSize) + bufs = newBufferPool(ctxlog.TestLogger(c), 1, BlockSize) ok := make(chan struct{}) go func() { for i := 0; i < 2; i++ { - response := IssueRequest( + response := IssueRequest(s.handler, &RequestTester{ method: "PUT", uri: "/" + TestHash, requestBody: TestBlock, }) - ExpectStatusCode(t, + ExpectStatusCode(c, "TestPutNeedsOnlyOneBuffer", http.StatusOK, response) } ok <- struct{}{} @@ -893,34 +936,30 @@ func TestPutNeedsOnlyOneBuffer(t *testing.T) { select { case <-ok: case <-time.After(time.Second): - t.Fatal("PUT deadlocks with MaxBuffers==1") + c.Fatal("PUT deadlocks with MaxBuffers==1") } } // Invoke the PutBlockHandler a bunch of times to test for bufferpool resource // leak. -func TestPutHandlerNoBufferleak(t *testing.T) { - defer teardown() - - // Prepare two test Keep volumes. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() +func (s *HandlerSuite) TestPutHandlerNoBufferleak(c *check.C) { + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) ok := make(chan bool) go func() { - for i := 0; i < theConfig.MaxBuffers+1; i++ { + for i := 0; i < s.cluster.API.MaxKeepBlockBuffers+1; i++ { // Unauthenticated request, no server key // => OK (unsigned response) unsignedLocator := "/" + TestHash - response := IssueRequest( + response := IssueRequest(s.handler, &RequestTester{ method: "PUT", uri: unsignedLocator, requestBody: TestBlock, }) - ExpectStatusCode(t, + ExpectStatusCode(c, "TestPutHandlerBufferleak", http.StatusOK, response) - ExpectBody(t, + ExpectBody(c, "TestPutHandlerBufferleak", TestHashPutResp, response) } @@ -929,7 +968,7 @@ func TestPutHandlerNoBufferleak(t *testing.T) { select { case <-time.After(20 * time.Second): // If the buffer pool leaks, the test goroutine hangs. - t.Fatal("test did not finish, assuming pool leaked") + c.Fatal("test did not finish, assuming pool leaked") case <-ok: } } @@ -943,23 +982,18 @@ func (r *notifyingResponseRecorder) CloseNotify() <-chan bool { return r.closer } -func TestGetHandlerClientDisconnect(t *testing.T) { - defer func(was bool) { - theConfig.RequireSignatures = was - }(theConfig.RequireSignatures) - theConfig.RequireSignatures = false +func (s *HandlerSuite) TestGetHandlerClientDisconnect(c *check.C) { + s.cluster.Collections.BlobSigning = false + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) defer func(orig *bufferPool) { bufs = orig }(bufs) - bufs = newBufferPool(1, BlockSize) + bufs = newBufferPool(ctxlog.TestLogger(c), 1, BlockSize) defer bufs.Put(bufs.Get(BlockSize)) - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() - - if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil { - t.Error(err) + if err := s.handler.volmgr.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil { + c.Error(err) } resp := ¬ifyingResponseRecorder{ @@ -967,7 +1001,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) { closer: make(chan bool, 1), } if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok { - t.Fatal("notifyingResponseRecorder is broken") + c.Fatal("notifyingResponseRecorder is broken") } // If anyone asks, the client has disconnected. resp.closer <- true @@ -975,52 +1009,48 @@ func TestGetHandlerClientDisconnect(t *testing.T) { ok := make(chan struct{}) go func() { req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil) - (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req) + s.handler.ServeHTTP(resp, req) ok <- struct{}{} }() select { case <-time.After(20 * time.Second): - t.Fatal("request took >20s, close notifier must be broken") + c.Fatal("request took >20s, close notifier must be broken") case <-ok: } - ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder) - for i, v := range KeepVM.AllWritable() { - if calls := v.(*MockVolume).called["GET"]; calls != 0 { - t.Errorf("volume %d got %d calls, expected 0", i, calls) + ExpectStatusCode(c, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder) + for i, v := range s.handler.volmgr.AllWritable() { + if calls := v.Volume.(*MockVolume).called["GET"]; calls != 0 { + c.Errorf("volume %d got %d calls, expected 0", i, calls) } } } // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource // leak. -func TestGetHandlerNoBufferLeak(t *testing.T) { - defer teardown() - - // Prepare two test Keep volumes. Our block is stored on the second volume. - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() +func (s *HandlerSuite) TestGetHandlerNoBufferLeak(c *check.C) { + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) - vols := KeepVM.AllWritable() + vols := s.handler.volmgr.AllWritable() if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil { - t.Error(err) + c.Error(err) } ok := make(chan bool) go func() { - for i := 0; i < theConfig.MaxBuffers+1; i++ { + for i := 0; i < s.cluster.API.MaxKeepBlockBuffers+1; i++ { // Unauthenticated request, unsigned locator // => OK unsignedLocator := "/" + TestHash - response := IssueRequest( + response := IssueRequest(s.handler, &RequestTester{ method: "GET", uri: unsignedLocator, }) - ExpectStatusCode(t, + ExpectStatusCode(c, "Unauthenticated request, unsigned locator", http.StatusOK, response) - ExpectBody(t, + ExpectBody(c, "Unauthenticated request, unsigned locator", string(TestBlock), response) @@ -1030,45 +1060,41 @@ func TestGetHandlerNoBufferLeak(t *testing.T) { select { case <-time.After(20 * time.Second): // If the buffer pool leaks, the test goroutine hangs. - t.Fatal("test did not finish, assuming pool leaked") + c.Fatal("test did not finish, assuming pool leaked") case <-ok: } } -func TestPutReplicationHeader(t *testing.T) { - defer teardown() +func (s *HandlerSuite) TestPutReplicationHeader(c *check.C) { + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() - - resp := IssueRequest(&RequestTester{ + resp := IssueRequest(s.handler, &RequestTester{ method: "PUT", uri: "/" + TestHash, requestBody: TestBlock, }) if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" { - t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1") + c.Logf("%#v", resp) + c.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1") } } -func TestUntrashHandler(t *testing.T) { - defer teardown() +func (s *HandlerSuite) TestUntrashHandler(c *check.C) { + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) // Set up Keep volumes - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() - vols := KeepVM.AllWritable() + vols := s.handler.volmgr.AllWritable() vols[0].Put(context.Background(), TestHash, TestBlock) - theConfig.systemAuthToken = "DATA MANAGER TOKEN" + s.cluster.SystemRootToken = "DATA MANAGER TOKEN" // unauthenticatedReq => UnauthorizedError unauthenticatedReq := &RequestTester{ method: "PUT", uri: "/untrash/" + TestHash, } - response := IssueRequest(unauthenticatedReq) - ExpectStatusCode(t, + response := IssueRequest(s.handler, unauthenticatedReq) + ExpectStatusCode(c, "Unauthenticated request", UnauthorizedError.HTTPCode, response) @@ -1080,8 +1106,8 @@ func TestUntrashHandler(t *testing.T) { apiToken: knownToken, } - response = IssueRequest(notDataManagerReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, notDataManagerReq) + ExpectStatusCode(c, "Non-datamanager token", UnauthorizedError.HTTPCode, response) @@ -1090,10 +1116,10 @@ func TestUntrashHandler(t *testing.T) { datamanagerWithBadHashReq := &RequestTester{ method: "PUT", uri: "/untrash/thisisnotalocator", - apiToken: theConfig.systemAuthToken, + apiToken: s.cluster.SystemRootToken, } - response = IssueRequest(datamanagerWithBadHashReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, datamanagerWithBadHashReq) + ExpectStatusCode(c, "Bad locator in untrash request", http.StatusBadRequest, response) @@ -1102,112 +1128,69 @@ func TestUntrashHandler(t *testing.T) { datamanagerWrongMethodReq := &RequestTester{ method: "GET", uri: "/untrash/" + TestHash, - apiToken: theConfig.systemAuthToken, + apiToken: s.cluster.SystemRootToken, } - response = IssueRequest(datamanagerWrongMethodReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, datamanagerWrongMethodReq) + ExpectStatusCode(c, "Only PUT method is supported for untrash", - http.StatusBadRequest, + http.StatusMethodNotAllowed, response) // datamanagerReq => StatusOK datamanagerReq := &RequestTester{ method: "PUT", uri: "/untrash/" + TestHash, - apiToken: theConfig.systemAuthToken, + apiToken: s.cluster.SystemRootToken, } - response = IssueRequest(datamanagerReq) - ExpectStatusCode(t, + response = IssueRequest(s.handler, datamanagerReq) + ExpectStatusCode(c, "", http.StatusOK, response) expected := "Successfully untrashed on: [MockVolume],[MockVolume]" if response.Body.String() != expected { - t.Errorf( + c.Errorf( "Untrash response mismatched: expected %s, got:\n%s", expected, response.Body.String()) } } -func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) { - defer teardown() - - // Set up readonly Keep volumes - vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()} - vols[0].Readonly = true - vols[1].Readonly = true - KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]}) - defer KeepVM.Close() - - theConfig.systemAuthToken = "DATA MANAGER TOKEN" +func (s *HandlerSuite) TestUntrashHandlerWithNoWritableVolumes(c *check.C) { + // Change all volumes to read-only + for uuid, v := range s.cluster.Volumes { + v.ReadOnly = true + s.cluster.Volumes[uuid] = v + } + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) // datamanagerReq => StatusOK datamanagerReq := &RequestTester{ method: "PUT", uri: "/untrash/" + TestHash, - apiToken: theConfig.systemAuthToken, + apiToken: s.cluster.SystemRootToken, } - response := IssueRequest(datamanagerReq) - ExpectStatusCode(t, + response := IssueRequest(s.handler, datamanagerReq) + ExpectStatusCode(c, "No writable volumes", http.StatusNotFound, response) } -func TestHealthCheckPing(t *testing.T) { - defer teardown() - - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() - - // ping when disabled - theConfig.ManagementToken = "" +func (s *HandlerSuite) TestHealthCheckPing(c *check.C) { + s.cluster.ManagementToken = arvadostest.ManagementToken + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) pingReq := &RequestTester{ - method: "GET", - uri: "/_health/ping", - } - response := IssueHealthCheckRequest(pingReq) - ExpectStatusCode(t, - "disabled", - http.StatusNotFound, - response) - - // ping with no token - theConfig.ManagementToken = arvadostest.ManagementToken - pingReq = &RequestTester{ - method: "GET", - uri: "/_health/ping", - } - response = IssueHealthCheckRequest(pingReq) - ExpectStatusCode(t, - "authorization required", - http.StatusUnauthorized, - response) - - // ping with wrong token - pingReq = &RequestTester{ - method: "GET", - uri: "/_health/ping", - apiToken: "youarenotwelcomehere", - } - response = IssueHealthCheckRequest(pingReq) - ExpectStatusCode(t, - "authorization error", - http.StatusForbidden, - response) - - // ping with management token - pingReq = &RequestTester{ method: "GET", uri: "/_health/ping", apiToken: arvadostest.ManagementToken, } - response = IssueHealthCheckRequest(pingReq) - ExpectStatusCode(t, + response := IssueHealthCheckRequest(s.handler, pingReq) + ExpectStatusCode(c, "", http.StatusOK, response) - if !strings.Contains(response.Body.String(), `{"health":"OK"}`) { - t.Errorf("expected response to include %s: got %s", `{"health":"OK"}`, response.Body.String()) + want := `{"health":"OK"}` + if !strings.Contains(response.Body.String(), want) { + c.Errorf("expected response to include %s: got %s", want, response.Body.String()) } }