X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/44c95f99098fa6c6acbfa82d4b6cbc6015eb6e39..8a27fe370239ecb8e50d53f46b45ed61203a35ca:/services/keepstore/pull_worker_test.go diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go index 9e547f30d0..2626e66d88 100644 --- a/services/keepstore/pull_worker_test.go +++ b/services/keepstore/pull_worker_test.go @@ -2,24 +2,30 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( "bytes" + "context" "errors" "io" "io/ioutil" "net/http" "time" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/keepclient" + "github.com/prometheus/client_golang/prometheus" . "gopkg.in/check.v1" + check "gopkg.in/check.v1" ) var _ = Suite(&PullWorkerTestSuite{}) type PullWorkerTestSuite struct { + cluster *arvados.Cluster + handler *handler + testPullLists map[string]string readContent string readError error @@ -28,7 +34,16 @@ type PullWorkerTestSuite struct { } func (s *PullWorkerTestSuite) SetUpTest(c *C) { - theConfig.systemAuthToken = "arbitrary data manager token" + s.cluster = testCluster(c) + s.cluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-000000000000000": {Driver: "mock"}, + "zzzzz-nyw5e-111111111111111": {Driver: "mock"}, + } + s.cluster.Collections.BlobReplicateConcurrency = 1 + + s.handler = &handler{} + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) + s.readContent = "" s.readError = nil s.putContent = []byte{} @@ -38,27 +53,6 @@ func (s *PullWorkerTestSuite) SetUpTest(c *C) { // This behavior is verified using these two maps in the // "TestPullWorkerPullList_with_two_items_latest_replacing_old" s.testPullLists = make(map[string]string) - - KeepVM = MakeTestVolumeManager(2) - - // Normally the pull queue and workers are started by main() - // -- tests need to set up their own. - arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, IsNil) - keepClient, err := keepclient.MakeKeepClient(arv) - c.Assert(err, IsNil) - pullq = NewWorkQueue() - go RunPullWorker(pullq, keepClient) -} - -func (s *PullWorkerTestSuite) TearDownTest(c *C) { - KeepVM.Close() - KeepVM = nil - pullq.Close() - pullq = nil - teardown() - theConfig = DefaultConfig() - theConfig.Start() } var firstPullList = []byte(`[ @@ -99,9 +93,10 @@ type PullWorkerTestData struct { // Ensure MountUUID in a pull list is correctly translated to a Volume // argument passed to writePulledBlock(). func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) { - defer func(f func(Volume, []byte, string)) { + defer func(f func(*RRVolumeManager, Volume, []byte, string) error) { writePulledBlock = f }(writePulledBlock) + pullq := s.handler.Handler.(*router).pullq for _, spec := range []struct { sendUUID string @@ -112,22 +107,23 @@ func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) { expectVolume: nil, }, { - sendUUID: KeepVM.Mounts()[0].UUID, - expectVolume: KeepVM.Mounts()[0].volume, + sendUUID: s.handler.volmgr.Mounts()[0].UUID, + expectVolume: s.handler.volmgr.Mounts()[0].Volume, }, } { - writePulledBlock = func(v Volume, _ []byte, _ string) { + writePulledBlock = func(_ *RRVolumeManager, v Volume, _ []byte, _ string) error { c.Check(v, Equals, spec.expectVolume) + return nil } - resp := IssueRequest(&RequestTester{ + resp := IssueRequest(s.handler, &RequestTester{ uri: "/pull", - apiToken: theConfig.systemAuthToken, + apiToken: s.cluster.SystemRootToken, method: "PUT", requestBody: []byte(`[{ "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3", "servers":["server_1","server_2"], - "mountuuid":"` + spec.sendUUID + `"}]`), + "mount_uuid":"` + spec.sendUUID + `"}]`), }) c.Assert(resp.Code, Equals, http.StatusOK) expectEqualWithin(c, time.Second, 0, func() interface{} { @@ -140,7 +136,7 @@ func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) { func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) { testData := PullWorkerTestData{ name: "TestPullWorkerPullList_with_two_locators", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList}, + req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""}, responseCode: http.StatusOK, responseBody: "Received 2 pull requests\n", readContent: "hello", @@ -154,7 +150,7 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) { func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) { testData := PullWorkerTestData{ name: "TestPullWorkerPullList_with_one_locator", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList}, + req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""}, responseCode: http.StatusOK, responseBody: "Received 1 pull requests\n", readContent: "hola", @@ -168,7 +164,7 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) { func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) { testData := PullWorkerTestData{ name: "TestPullWorker_error_on_get_one_locator", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList}, + req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""}, responseCode: http.StatusOK, responseBody: "Received 1 pull requests\n", readContent: "unused", @@ -182,7 +178,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) { func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) { testData := PullWorkerTestData{ name: "TestPullWorker_error_on_get_two_locators", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList}, + req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""}, responseCode: http.StatusOK, responseBody: "Received 2 pull requests\n", readContent: "unused", @@ -196,7 +192,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) { func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) { testData := PullWorkerTestData{ name: "TestPullWorker_error_on_put_one_locator", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList}, + req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""}, responseCode: http.StatusOK, responseBody: "Received 1 pull requests\n", readContent: "hello hello", @@ -210,7 +206,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) { func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) { testData := PullWorkerTestData{ name: "TestPullWorker_error_on_put_two_locators", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList}, + req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""}, responseCode: http.StatusOK, responseBody: "Received 2 pull requests\n", readContent: "hello again", @@ -225,7 +221,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) { func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) { testData := PullWorkerTestData{ name: "TestPullWorkerPullList_with_two_locators", - req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList}, + req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList, ""}, responseCode: http.StatusUnauthorized, responseBody: "Unauthorized\n", readContent: "hello", @@ -237,6 +233,8 @@ func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) { } func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) { + pullq := s.handler.Handler.(*router).pullq + s.testPullLists[testData.name] = testData.responseBody processedPullLists := make(map[string]string) @@ -246,7 +244,7 @@ func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) { GetContent = orig }(GetContent) GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) { - c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1)) + c.Assert(getStatusItem(s.handler, "PullQueue", "InProgress"), Equals, float64(1)) processedPullLists[testData.name] = testData.responseBody if testData.readError { err = errors.New("Error getting data") @@ -260,19 +258,21 @@ func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) { } // Override writePulledBlock to mock PutBlock functionality - defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock) - writePulledBlock = func(v Volume, content []byte, locator string) { + defer func(orig func(*RRVolumeManager, Volume, []byte, string) error) { writePulledBlock = orig }(writePulledBlock) + writePulledBlock = func(_ *RRVolumeManager, v Volume, content []byte, locator string) error { if testData.putError { s.putError = errors.New("Error putting data") - return + return s.putError } s.putContent = content + return nil } - c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0)) - c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0)) + c.Check(getStatusItem(s.handler, "PullQueue", "InProgress"), Equals, float64(0)) + c.Check(getStatusItem(s.handler, "PullQueue", "Queued"), Equals, float64(0)) + c.Check(getStatusItem(s.handler, "Version"), Not(Equals), "") - response := IssueRequest(&testData.req) + response := IssueRequest(s.handler, &testData.req) c.Assert(response.Code, Equals, testData.responseCode) c.Assert(response.Body.String(), Equals, testData.responseBody)