X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5b863886118890cc81b728a3a606ea823c836f2b..HEAD:/services/keepstore/pull_worker_test.go diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go index 8e667e048f..d109b56df3 100644 --- a/services/keepstore/pull_worker_test.go +++ b/services/keepstore/pull_worker_test.go @@ -2,316 +2,135 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( "bytes" + "context" + "crypto/md5" + "encoding/json" "errors" + "fmt" "io" - "io/ioutil" "net/http" + "net/http/httptest" + "sort" "time" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "github.com/prometheus/client_golang/prometheus" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadostest" + "github.com/sirupsen/logrus" . "gopkg.in/check.v1" ) -var _ = Suite(&PullWorkerTestSuite{}) - -type PullWorkerTestSuite struct { - testPullLists map[string]string - readContent string - readError error - putContent []byte - putError error -} - -func (s *PullWorkerTestSuite) SetUpTest(c *C) { - theConfig.systemAuthToken = "arbitrary data manager token" - s.readContent = "" - s.readError = nil - s.putContent = []byte{} - s.putError = nil - - // When a new pull request arrives, the old one will be overwritten. - // This behavior is verified using these two maps in the - // "TestPullWorkerPullList_with_two_items_latest_replacing_old" - s.testPullLists = make(map[string]string) - - KeepVM = MakeTestVolumeManager(2) - - // Normally the pull queue and workers are started by main() - // -- tests need to set up their own. - arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, IsNil) - keepClient, err := keepclient.MakeKeepClient(arv) - c.Assert(err, IsNil) - pullq = NewWorkQueue() - go RunPullWorker(pullq, keepClient) -} - -func (s *PullWorkerTestSuite) TearDownTest(c *C) { - KeepVM.Close() - KeepVM = nil - pullq.Close() - pullq = nil - teardown() - theConfig = DefaultConfig() - theConfig.Start(prometheus.NewRegistry()) -} - -var firstPullList = []byte(`[ - { - "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3", - "servers":[ - "server_1", - "server_2" - ] - },{ - "locator":"37b51d194a7513e45b56f6524f2d51f2+3", - "servers":[ - "server_3" - ] - } - ]`) - -var secondPullList = []byte(`[ - { - "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3", - "servers":[ - "server_1", - "server_2" - ] - } - ]`) - -type PullWorkerTestData struct { - name string - req RequestTester - responseCode int - responseBody string - readContent string - readError bool - putError bool -} - -// Ensure MountUUID in a pull list is correctly translated to a Volume -// argument passed to writePulledBlock(). -func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) { - defer func(f func(Volume, []byte, string)) { - writePulledBlock = f - }(writePulledBlock) - - for _, spec := range []struct { - sendUUID string - expectVolume Volume - }{ - { - sendUUID: "", - expectVolume: nil, - }, - { - sendUUID: KeepVM.Mounts()[0].UUID, - expectVolume: KeepVM.Mounts()[0].volume, - }, - } { - writePulledBlock = func(v Volume, _ []byte, _ string) { - c.Check(v, Equals, spec.expectVolume) - } - - resp := IssueRequest(&RequestTester{ - uri: "/pull", - apiToken: theConfig.systemAuthToken, - method: "PUT", - requestBody: []byte(`[{ - "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3", - "servers":["server_1","server_2"], - "mount_uuid":"` + spec.sendUUID + `"}]`), - }) - c.Assert(resp.Code, Equals, http.StatusOK) - expectEqualWithin(c, time.Second, 0, func() interface{} { - st := pullq.Status() - return st.InProgress + st.Queued - }) - } -} - -func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) { - testData := PullWorkerTestData{ - name: "TestPullWorkerPullList_with_two_locators", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList}, - responseCode: http.StatusOK, - responseBody: "Received 2 pull requests\n", - readContent: "hello", - readError: false, - putError: false, - } - - s.performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) { - testData := PullWorkerTestData{ - name: "TestPullWorkerPullList_with_one_locator", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList}, - responseCode: http.StatusOK, - responseBody: "Received 1 pull requests\n", - readContent: "hola", - readError: false, - putError: false, - } - - s.performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) { - testData := PullWorkerTestData{ - name: "TestPullWorker_error_on_get_one_locator", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList}, - responseCode: http.StatusOK, - responseBody: "Received 1 pull requests\n", - readContent: "unused", - readError: true, - putError: false, - } - - s.performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) { - testData := PullWorkerTestData{ - name: "TestPullWorker_error_on_get_two_locators", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList}, - responseCode: http.StatusOK, - responseBody: "Received 2 pull requests\n", - readContent: "unused", - readError: true, - putError: false, - } - - s.performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) { - testData := PullWorkerTestData{ - name: "TestPullWorker_error_on_put_one_locator", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList}, - responseCode: http.StatusOK, - responseBody: "Received 1 pull requests\n", - readContent: "hello hello", - readError: false, - putError: true, - } - - s.performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) { - testData := PullWorkerTestData{ - name: "TestPullWorker_error_on_put_two_locators", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList}, - responseCode: http.StatusOK, - responseBody: "Received 2 pull requests\n", - readContent: "hello again", - readError: false, - putError: true, - } - - s.performTest(testData, c) -} - -// In this case, the item will not be placed on pullq -func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) { - testData := PullWorkerTestData{ - name: "TestPullWorkerPullList_with_two_locators", - req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList}, - responseCode: http.StatusUnauthorized, - responseBody: "Unauthorized\n", - readContent: "hello", - readError: false, - putError: false, +func (s *routerSuite) TestPullList_Execute(c *C) { + remotecluster := testCluster(c) + remotecluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-rrrrrrrrrrrrrrr": {Replication: 1, Driver: "stub"}, } - - s.performTest(testData, c) -} - -func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) { - s.testPullLists[testData.name] = testData.responseBody - - processedPullLists := make(map[string]string) - - // Override GetContent to mock keepclient Get functionality - defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) { - GetContent = orig - }(GetContent) - GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) { - c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1)) - processedPullLists[testData.name] = testData.responseBody - if testData.readError { - err = errors.New("Error getting data") - s.readError = err - return + remoterouter, cancel := testRouter(c, remotecluster, nil) + defer cancel() + remoteserver := httptest.NewServer(remoterouter) + defer remoteserver.Close() + + router, cancel := testRouter(c, s.cluster, nil) + defer cancel() + + executePullList := func(pullList []PullListItem) string { + var logbuf bytes.Buffer + logger := logrus.New() + logger.Out = &logbuf + router.keepstore.logger = logger + + listjson, err := json.Marshal(pullList) + c.Assert(err, IsNil) + resp := call(router, "PUT", "http://example/pull", s.cluster.SystemRootToken, listjson, nil) + c.Check(resp.Code, Equals, http.StatusOK) + for { + router.puller.cond.L.Lock() + todolen := len(router.puller.todo) + router.puller.cond.L.Unlock() + if todolen == 0 && router.puller.inprogress.Load() == 0 { + break + } + time.Sleep(time.Millisecond) } - s.readContent = testData.readContent - reader = ioutil.NopCloser(bytes.NewBufferString(testData.readContent)) - contentLength = int64(len(testData.readContent)) - return + return logbuf.String() } - // Override writePulledBlock to mock PutBlock functionality - defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock) - writePulledBlock = func(v Volume, content []byte, locator string) { - if testData.putError { - s.putError = errors.New("Error putting data") - return - } - s.putContent = content + newRemoteBlock := func(datastring string) string { + data := []byte(datastring) + hash := fmt.Sprintf("%x", md5.Sum(data)) + locator := fmt.Sprintf("%s+%d", hash, len(data)) + _, err := remoterouter.keepstore.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Hash: hash, + Data: data, + }) + c.Assert(err, IsNil) + return locator } - c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0)) - c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0)) - c.Check(getStatusItem("Version"), Not(Equals), "") - - response := IssueRequest(&testData.req) - c.Assert(response.Code, Equals, testData.responseCode) - c.Assert(response.Body.String(), Equals, testData.responseBody) - - expectEqualWithin(c, time.Second, 0, func() interface{} { - st := pullq.Status() - return st.InProgress + st.Queued - }) - - if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" { - c.Assert(len(s.testPullLists), Equals, 2) - c.Assert(len(processedPullLists), Equals, 1) - c.Assert(s.testPullLists["Added_before_actual_test_item"], NotNil) - c.Assert(s.testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil) - c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil) - } else { - if testData.responseCode == http.StatusOK { - c.Assert(len(s.testPullLists), Equals, 1) - c.Assert(len(processedPullLists), Equals, 1) - c.Assert(s.testPullLists[testData.name], NotNil) - } else { - c.Assert(len(s.testPullLists), Equals, 1) - c.Assert(len(processedPullLists), Equals, 0) - } + mounts := append([]*mount(nil), router.keepstore.mountsR...) + sort.Slice(mounts, func(i, j int) bool { return mounts[i].UUID < mounts[j].UUID }) + var vols []*stubVolume + for _, mount := range mounts { + vols = append(vols, mount.volume.(*stubVolume)) } - if testData.readError { - c.Assert(s.readError, NotNil) - } else if testData.responseCode == http.StatusOK { - c.Assert(s.readError, IsNil) - c.Assert(s.readContent, Equals, testData.readContent) - if testData.putError { - c.Assert(s.putError, NotNil) - } else { - c.Assert(s.putError, IsNil) - c.Assert(string(s.putContent), Equals, testData.readContent) - } - } + ctx := authContext(arvadostest.ActiveTokenV2) + + locator := newRemoteBlock("pull available block to unspecified volume") + executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}}}) + _, err := router.keepstore.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: router.keepstore.signLocator(arvadostest.ActiveTokenV2, locator), + WriteTo: io.Discard}) + c.Check(err, IsNil) - expectChannelEmpty(c, pullq.NextItem) + locator0 := newRemoteBlock("pull available block to specified volume 0") + locator1 := newRemoteBlock("pull available block to specified volume 1") + executePullList([]PullListItem{ + { + Locator: locator0, + Servers: []string{remoteserver.URL}, + MountUUID: vols[0].params.UUID}, + { + Locator: locator1, + Servers: []string{remoteserver.URL}, + MountUUID: vols[1].params.UUID}}) + c.Check(vols[0].data[locator0[:32]].data, NotNil) + c.Check(vols[1].data[locator1[:32]].data, NotNil) + + locator = fooHash + "+3" + logs := executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}}}) + c.Check(logs, Matches, ".*error pulling data from remote servers.*Block not found.*locator=acbd.*\n") + + locator = fooHash + "+3" + logs = executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{"http://0.0.0.0:9/"}}}) + c.Check(logs, Matches, ".*error pulling data from remote servers.*connection refused.*locator=acbd.*\n") + + locator = newRemoteBlock("log error writing to local volume") + vols[0].blockWrite = func(context.Context, string, []byte) error { return errors.New("test error") } + vols[1].blockWrite = vols[0].blockWrite + logs = executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}}}) + c.Check(logs, Matches, ".*error writing data to zzzzz-nyw5e-.*error=\"test error\".*locator=.*\n") + vols[0].blockWrite = nil + vols[1].blockWrite = nil + + locator = newRemoteBlock("log error when destination mount does not exist") + logs = executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}, + MountUUID: "bogus-mount-uuid"}}) + c.Check(logs, Matches, ".*ignoring pull list entry for nonexistent mount bogus-mount-uuid.*locator=.*\n") + + logs = executePullList([]PullListItem{}) + c.Logf("%s", logs) }