X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/df9e166a5ffc4aa79658bec1a5d552a3b413f0d8..edcdf0ae72c56bf4aa05f93ed2877faa3a5e75c4:/services/keepstore/pull_worker_test.go diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go index f0e9e65f1e..d109b56df3 100644 --- a/services/keepstore/pull_worker_test.go +++ b/services/keepstore/pull_worker_test.go @@ -1,337 +1,136 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package keepstore import ( "bytes" + "context" + "crypto/md5" + "encoding/json" "errors" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - . "gopkg.in/check.v1" + "fmt" "io" "net/http" - "testing" -) - -type PullWorkerTestSuite struct{} - -// Gocheck boilerplate -func TestPullWorker(t *testing.T) { - TestingT(t) -} - -// Gocheck boilerplate -var _ = Suite(&PullWorkerTestSuite{}) - -var testPullLists map[string]string -var processedPullLists map[string]string -var readContent string -var readError error -var putContent []byte -var putError error -var currentTestData PullWorkerTestData - -func (s *PullWorkerTestSuite) SetUpTest(c *C) { - readContent = "" - readError = nil - putContent = []byte("") - putError = nil - - // When a new pull request arrives, the old one will be overwritten. - // This behavior is verified using these two maps in the - // "TestPullWorker_pull_list_with_two_items_latest_replacing_old" - testPullLists = make(map[string]string) - processedPullLists = make(map[string]string) -} - -// Since keepstore does not come into picture in tests, -// we need to explicitly start the goroutine in tests. -func RunTestPullWorker(c *C) { - arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, Equals, nil) - keepClient, err := keepclient.MakeKeepClient(&arv) - c.Assert(err, Equals, nil) - - pullq = NewWorkQueue() - go RunPullWorker(pullq, keepClient) -} - -var first_pull_list = []byte(`[ - { - "locator":"locator1", - "servers":[ - "server_1", - "server_2" - ] - }, - { - "locator":"locator2", - "servers":[ - "server_3" - ] - } - ]`) - -var second_pull_list = []byte(`[ - { - "locator":"locator3", - "servers":[ - "server_1", - "server_2" - ] - } - ]`) - -type PullWorkerTestData struct { - name string - req RequestTester - response_code int - response_body string - read_content string - read_error bool - put_error bool -} - -func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) { - defer teardown() - - data_manager_token = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorker_pull_list_with_two_locators", - req: RequestTester{"/pull", data_manager_token, "PUT", first_pull_list}, - response_code: http.StatusOK, - response_body: "Received 2 pull requests\n", - read_content: "hello", - read_error: false, - put_error: false, - } - - performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) { - defer teardown() - - data_manager_token = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorker_pull_list_with_one_locator", - req: RequestTester{"/pull", data_manager_token, "PUT", second_pull_list}, - response_code: http.StatusOK, - response_body: "Received 1 pull requests\n", - read_content: "hola", - read_error: false, - put_error: false, - } - - performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) { - defer teardown() - - data_manager_token = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorker_error_on_get_one_locator", - req: RequestTester{"/pull", data_manager_token, "PUT", second_pull_list}, - response_code: http.StatusOK, - response_body: "Received 1 pull requests\n", - read_content: "unused", - read_error: true, - put_error: false, - } - - performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) { - defer teardown() - - data_manager_token = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorker_error_on_get_two_locators", - req: RequestTester{"/pull", data_manager_token, "PUT", first_pull_list}, - response_code: http.StatusOK, - response_body: "Received 2 pull requests\n", - read_content: "unused", - read_error: true, - put_error: false, - } - - performTest(testData, c) -} + "net/http/httptest" + "sort" + "time" -func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) { - defer teardown() - - data_manager_token = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorker_error_on_put_one_locator", - req: RequestTester{"/pull", data_manager_token, "PUT", second_pull_list}, - response_code: http.StatusOK, - response_body: "Received 1 pull requests\n", - read_content: "hello hello", - read_error: false, - put_error: true, - } - - performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) { - defer teardown() - - data_manager_token = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorker_error_on_put_two_locators", - req: RequestTester{"/pull", data_manager_token, "PUT", first_pull_list}, - response_code: http.StatusOK, - response_body: "Received 2 pull requests\n", - read_content: "hello again", - read_error: false, - put_error: true, - } - - performTest(testData, c) -} - -// When a new pull request arrives, the old one is replaced. This test -// is used to check that behavior by first putting an item on the queue, -// and then performing the test. Thus the "testPullLists" has two entries; -// however, processedPullLists will see only the newest item in the list. -func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_items_latest_replacing_old(c *C) { - defer teardown() - - var firstInput = []int{1} - pullq = NewWorkQueue() - pullq.ReplaceQueue(makeTestWorkList(firstInput)) - testPullLists["Added_before_actual_test_item"] = string(1) - - data_manager_token = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorker_pull_list_with_two_items_latest_replacing_old", - req: RequestTester{"/pull", data_manager_token, "PUT", second_pull_list}, - response_code: http.StatusOK, - response_body: "Received 1 pull requests\n", - read_content: "hola de nuevo", - read_error: false, - put_error: false, - } - - performTest(testData, c) -} - -// In this case, the item will not be placed on pullq -func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) { - defer teardown() - - data_manager_token = "DATA MANAGER TOKEN" - - testData := PullWorkerTestData{ - name: "TestPullWorker_pull_list_with_two_locators", - req: RequestTester{"/pull", "invalid_data_manager_token", "PUT", first_pull_list}, - response_code: http.StatusUnauthorized, - response_body: "Unauthorized\n", - read_content: "hello", - read_error: false, - put_error: false, - } - - performTest(testData, c) -} - -func performTest(testData PullWorkerTestData, c *C) { - RunTestPullWorker(c) - - currentTestData = testData - testPullLists[testData.name] = testData.response_body - - // Override GetContent to mock keepclient Get functionality - GetContent = func(signedLocator string, keepClient keepclient.KeepClient) ( - reader io.ReadCloser, contentLength int64, url string, err error) { + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadostest" + "github.com/sirupsen/logrus" + . "gopkg.in/check.v1" +) - processedPullLists[testData.name] = testData.response_body - if testData.read_error { - err = errors.New("Error getting data") - readError = err - return nil, 0, "", err - } else { - readContent = testData.read_content - cb := &ClosingBuffer{bytes.NewBufferString(testData.read_content)} - var rc io.ReadCloser - rc = cb - return rc, int64(len(testData.read_content)), "", nil - } +func (s *routerSuite) TestPullList_Execute(c *C) { + remotecluster := testCluster(c) + remotecluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-rrrrrrrrrrrrrrr": {Replication: 1, Driver: "stub"}, } - - // Override PutContent to mock PutBlock functionality - PutContent = func(content []byte, locator string) (err error) { - if testData.put_error { - err = errors.New("Error putting data") - putError = err - return err - } else { - putContent = content - return nil + remoterouter, cancel := testRouter(c, remotecluster, nil) + defer cancel() + remoteserver := httptest.NewServer(remoterouter) + defer remoteserver.Close() + + router, cancel := testRouter(c, s.cluster, nil) + defer cancel() + + executePullList := func(pullList []PullListItem) string { + var logbuf bytes.Buffer + logger := logrus.New() + logger.Out = &logbuf + router.keepstore.logger = logger + + listjson, err := json.Marshal(pullList) + c.Assert(err, IsNil) + resp := call(router, "PUT", "http://example/pull", s.cluster.SystemRootToken, listjson, nil) + c.Check(resp.Code, Equals, http.StatusOK) + for { + router.puller.cond.L.Lock() + todolen := len(router.puller.todo) + router.puller.cond.L.Unlock() + if todolen == 0 && router.puller.inprogress.Load() == 0 { + break + } + time.Sleep(time.Millisecond) } + return logbuf.String() } - response := IssueRequest(&testData.req) - c.Assert(testData.response_code, Equals, response.Code) - c.Assert(testData.response_body, Equals, response.Body.String()) - - expectWorkerChannelEmpty(c, pullq.NextItem) - - pullq.Close() - - if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" { - c.Assert(len(testPullLists), Equals, 2) - c.Assert(len(processedPullLists), Equals, 1) - c.Assert(testPullLists["Added_before_actual_test_item"], NotNil) - c.Assert(testPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil) - c.Assert(processedPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil) - } else { - if testData.response_code == http.StatusOK { - c.Assert(len(testPullLists), Equals, 1) - c.Assert(len(processedPullLists), Equals, 1) - c.Assert(testPullLists[testData.name], NotNil) - } else { - c.Assert(len(testPullLists), Equals, 1) - c.Assert(len(processedPullLists), Equals, 0) - } + newRemoteBlock := func(datastring string) string { + data := []byte(datastring) + hash := fmt.Sprintf("%x", md5.Sum(data)) + locator := fmt.Sprintf("%s+%d", hash, len(data)) + _, err := remoterouter.keepstore.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Hash: hash, + Data: data, + }) + c.Assert(err, IsNil) + return locator } - if testData.read_error { - c.Assert(readError, NotNil) - } else if testData.response_code == http.StatusOK { - c.Assert(readError, IsNil) - c.Assert(readContent, Equals, testData.read_content) - if testData.put_error { - c.Assert(putError, NotNil) - } else { - c.Assert(putError, IsNil) - c.Assert(string(putContent), Equals, testData.read_content) - } + mounts := append([]*mount(nil), router.keepstore.mountsR...) + sort.Slice(mounts, func(i, j int) bool { return mounts[i].UUID < mounts[j].UUID }) + var vols []*stubVolume + for _, mount := range mounts { + vols = append(vols, mount.volume.(*stubVolume)) } -} -type ClosingBuffer struct { - *bytes.Buffer -} - -func (cb *ClosingBuffer) Close() (err error) { - return -} + ctx := authContext(arvadostest.ActiveTokenV2) -func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) { - select { - case item := <-workerChannel: - c.Fatalf("Received value (%v) from channel that was expected to be empty", item) - default: - } -} + locator := newRemoteBlock("pull available block to unspecified volume") + executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}}}) + _, err := router.keepstore.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: router.keepstore.signLocator(arvadostest.ActiveTokenV2, locator), + WriteTo: io.Discard}) + c.Check(err, IsNil) -func expectWorkerChannelNotEmpty(c *C, workerChannel <-chan interface{}) { - select { - case item := <-workerChannel: - c.Fatalf("Received value (%v) from channel that was expected to be empty", item) - default: - } + locator0 := newRemoteBlock("pull available block to specified volume 0") + locator1 := newRemoteBlock("pull available block to specified volume 1") + executePullList([]PullListItem{ + { + Locator: locator0, + Servers: []string{remoteserver.URL}, + MountUUID: vols[0].params.UUID}, + { + Locator: locator1, + Servers: []string{remoteserver.URL}, + MountUUID: vols[1].params.UUID}}) + c.Check(vols[0].data[locator0[:32]].data, NotNil) + c.Check(vols[1].data[locator1[:32]].data, NotNil) + + locator = fooHash + "+3" + logs := executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}}}) + c.Check(logs, Matches, ".*error pulling data from remote servers.*Block not found.*locator=acbd.*\n") + + locator = fooHash + "+3" + logs = executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{"http://0.0.0.0:9/"}}}) + c.Check(logs, Matches, ".*error pulling data from remote servers.*connection refused.*locator=acbd.*\n") + + locator = newRemoteBlock("log error writing to local volume") + vols[0].blockWrite = func(context.Context, string, []byte) error { return errors.New("test error") } + vols[1].blockWrite = vols[0].blockWrite + logs = executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}}}) + c.Check(logs, Matches, ".*error writing data to zzzzz-nyw5e-.*error=\"test error\".*locator=.*\n") + vols[0].blockWrite = nil + vols[1].blockWrite = nil + + locator = newRemoteBlock("log error when destination mount does not exist") + logs = executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}, + MountUUID: "bogus-mount-uuid"}}) + c.Check(logs, Matches, ".*ignoring pull list entry for nonexistent mount bogus-mount-uuid.*locator=.*\n") + + logs = executePullList([]PullListItem{}) + c.Logf("%s", logs) }