X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/dfe0ec7bfec3fd72cd40d3962e5c8af08d2413d2..2c6557f613fcf6cdcebb08c321a5d061aeb780c6:/services/keepstore/pull_worker_test.go diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go index 43a6de6844..d109b56df3 100644 --- a/services/keepstore/pull_worker_test.go +++ b/services/keepstore/pull_worker_test.go @@ -1,325 +1,136 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package keepstore import ( "bytes" + "context" + "crypto/md5" + "encoding/json" "errors" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - . "gopkg.in/check.v1" + "fmt" "io" "net/http" + "net/http/httptest" + "sort" "time" -) - -var _ = Suite(&PullWorkerTestSuite{}) - -type PullWorkerTestSuite struct{} - -var testPullLists map[string]string -var readContent string -var readError error -var putContent []byte -var putError error -var currentTestData PullWorkerTestData - -func (s *PullWorkerTestSuite) SetUpTest(c *C) { - readContent = "" - readError = nil - putContent = []byte("") - putError = nil - - // When a new pull request arrives, the old one will be overwritten. - // This behavior is verified using these two maps in the - // "TestPullWorkerPullList_with_two_items_latest_replacing_old" - testPullLists = make(map[string]string) -} - -// Since keepstore does not come into picture in tests, -// we need to explicitly start the goroutine in tests. -func RunTestPullWorker(c *C) { - arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, Equals, nil) - keepClient, err := keepclient.MakeKeepClient(arv) - c.Assert(err, Equals, nil) - - pullq = NewWorkQueue() - go RunPullWorker(pullq, keepClient) -} - -var firstPullList = []byte(`[ - { - "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3", - "servers":[ - "server_1", - "server_2" - ] - },{ - "locator":"37b51d194a7513e45b56f6524f2d51f2+3", - "servers":[ - "server_3" - ] - } - ]`) - -var secondPullList = []byte(`[ - { - "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3", - "servers":[ - "server_1", - "server_2" - ] - } - ]`) - -type PullWorkerTestData struct { - name string - req RequestTester - responseCode int - responseBody string - readContent string - readError bool - putError bool -} - -func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) { - defer teardown() - - theConfig.systemAuthToken = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorkerPullList_with_two_locators", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList}, - responseCode: http.StatusOK, - responseBody: "Received 2 pull requests\n", - readContent: "hello", - readError: false, - putError: false, - } - - performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) { - defer teardown() - - theConfig.systemAuthToken = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorkerPullList_with_one_locator", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList}, - responseCode: http.StatusOK, - responseBody: "Received 1 pull requests\n", - readContent: "hola", - readError: false, - putError: false, - } - - performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) { - defer teardown() - - theConfig.systemAuthToken = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorker_error_on_get_one_locator", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList}, - responseCode: http.StatusOK, - responseBody: "Received 1 pull requests\n", - readContent: "unused", - readError: true, - putError: false, - } - - performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) { - defer teardown() - - theConfig.systemAuthToken = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorker_error_on_get_two_locators", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList}, - responseCode: http.StatusOK, - responseBody: "Received 2 pull requests\n", - readContent: "unused", - readError: true, - putError: false, - } - - performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) { - defer teardown() - - theConfig.systemAuthToken = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorker_error_on_put_one_locator", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList}, - responseCode: http.StatusOK, - responseBody: "Received 1 pull requests\n", - readContent: "hello hello", - readError: false, - putError: true, - } - - performTest(testData, c) -} - -func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) { - defer teardown() - - theConfig.systemAuthToken = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorker_error_on_put_two_locators", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList}, - responseCode: http.StatusOK, - responseBody: "Received 2 pull requests\n", - readContent: "hello again", - readError: false, - putError: true, - } - - performTest(testData, c) -} - -// When a new pull request arrives, the old one is replaced. This test -// is used to check that behavior by first putting an item on the queue, -// and then performing the test. Thus the "testPullLists" has two entries; -// however, processedPullLists will see only the newest item in the list. -func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_replacing_old(c *C) { - defer teardown() - - var firstInput = []int{1} - pullq = NewWorkQueue() - pullq.ReplaceQueue(makeTestWorkList(firstInput)) - testPullLists["Added_before_actual_test_item"] = string(1) - - theConfig.systemAuthToken = "DATA MANAGER TOKEN" - testData := PullWorkerTestData{ - name: "TestPullWorkerPullList_with_two_items_latest_replacing_old", - req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList}, - responseCode: http.StatusOK, - responseBody: "Received 1 pull requests\n", - readContent: "hola de nuevo", - readError: false, - putError: false, - } - performTest(testData, c) -} - -// In this case, the item will not be placed on pullq -func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) { - defer teardown() - - theConfig.systemAuthToken = "DATA MANAGER TOKEN" - - testData := PullWorkerTestData{ - name: "TestPullWorkerPullList_with_two_locators", - req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList}, - responseCode: http.StatusUnauthorized, - responseBody: "Unauthorized\n", - readContent: "hello", - readError: false, - putError: false, - } - - performTest(testData, c) -} - -func performTest(testData PullWorkerTestData, c *C) { - KeepVM = MakeTestVolumeManager(2) - defer KeepVM.Close() - - RunTestPullWorker(c) - defer pullq.Close() - - currentTestData = testData - testPullLists[testData.name] = testData.responseBody - - processedPullLists := make(map[string]string) + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadostest" + "github.com/sirupsen/logrus" + . "gopkg.in/check.v1" +) - // Override GetContent to mock keepclient Get functionality - defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) { - GetContent = orig - }(GetContent) - GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) { - c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1)) - processedPullLists[testData.name] = testData.responseBody - if testData.readError { - err = errors.New("Error getting data") - readError = err - return nil, 0, "", err - } - readContent = testData.readContent - cb := &ClosingBuffer{bytes.NewBufferString(testData.readContent)} - var rc io.ReadCloser - rc = cb - return rc, int64(len(testData.readContent)), "", nil +func (s *routerSuite) TestPullList_Execute(c *C) { + remotecluster := testCluster(c) + remotecluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-rrrrrrrrrrrrrrr": {Replication: 1, Driver: "stub"}, } - - // Override PutContent to mock PutBlock functionality - defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent) - PutContent = func(content []byte, locator string) (err error) { - if testData.putError { - err = errors.New("Error putting data") - putError = err - return err + remoterouter, cancel := testRouter(c, remotecluster, nil) + defer cancel() + remoteserver := httptest.NewServer(remoterouter) + defer remoteserver.Close() + + router, cancel := testRouter(c, s.cluster, nil) + defer cancel() + + executePullList := func(pullList []PullListItem) string { + var logbuf bytes.Buffer + logger := logrus.New() + logger.Out = &logbuf + router.keepstore.logger = logger + + listjson, err := json.Marshal(pullList) + c.Assert(err, IsNil) + resp := call(router, "PUT", "http://example/pull", s.cluster.SystemRootToken, listjson, nil) + c.Check(resp.Code, Equals, http.StatusOK) + for { + router.puller.cond.L.Lock() + todolen := len(router.puller.todo) + router.puller.cond.L.Unlock() + if todolen == 0 && router.puller.inprogress.Load() == 0 { + break + } + time.Sleep(time.Millisecond) } - putContent = content - return nil + return logbuf.String() } - c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0)) - c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0)) - - response := IssueRequest(&testData.req) - c.Assert(response.Code, Equals, testData.responseCode) - c.Assert(response.Body.String(), Equals, testData.responseBody) - - expectEqualWithin(c, time.Second, 0, func() interface{} { - st := pullq.Status() - return st.InProgress + st.Queued - }) - - if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" { - c.Assert(len(testPullLists), Equals, 2) - c.Assert(len(processedPullLists), Equals, 1) - c.Assert(testPullLists["Added_before_actual_test_item"], NotNil) - c.Assert(testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil) - c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil) - } else { - if testData.responseCode == http.StatusOK { - c.Assert(len(testPullLists), Equals, 1) - c.Assert(len(processedPullLists), Equals, 1) - c.Assert(testPullLists[testData.name], NotNil) - } else { - c.Assert(len(testPullLists), Equals, 1) - c.Assert(len(processedPullLists), Equals, 0) - } + newRemoteBlock := func(datastring string) string { + data := []byte(datastring) + hash := fmt.Sprintf("%x", md5.Sum(data)) + locator := fmt.Sprintf("%s+%d", hash, len(data)) + _, err := remoterouter.keepstore.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Hash: hash, + Data: data, + }) + c.Assert(err, IsNil) + return locator } - if testData.readError { - c.Assert(readError, NotNil) - } else if testData.responseCode == http.StatusOK { - c.Assert(readError, IsNil) - c.Assert(readContent, Equals, testData.readContent) - if testData.putError { - c.Assert(putError, NotNil) - } else { - c.Assert(putError, IsNil) - c.Assert(string(putContent), Equals, testData.readContent) - } + mounts := append([]*mount(nil), router.keepstore.mountsR...) + sort.Slice(mounts, func(i, j int) bool { return mounts[i].UUID < mounts[j].UUID }) + var vols []*stubVolume + for _, mount := range mounts { + vols = append(vols, mount.volume.(*stubVolume)) } - expectChannelEmpty(c, pullq.NextItem) -} + ctx := authContext(arvadostest.ActiveTokenV2) -type ClosingBuffer struct { - *bytes.Buffer -} + locator := newRemoteBlock("pull available block to unspecified volume") + executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}}}) + _, err := router.keepstore.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: router.keepstore.signLocator(arvadostest.ActiveTokenV2, locator), + WriteTo: io.Discard}) + c.Check(err, IsNil) -func (cb *ClosingBuffer) Close() (err error) { - return + locator0 := newRemoteBlock("pull available block to specified volume 0") + locator1 := newRemoteBlock("pull available block to specified volume 1") + executePullList([]PullListItem{ + { + Locator: locator0, + Servers: []string{remoteserver.URL}, + MountUUID: vols[0].params.UUID}, + { + Locator: locator1, + Servers: []string{remoteserver.URL}, + MountUUID: vols[1].params.UUID}}) + c.Check(vols[0].data[locator0[:32]].data, NotNil) + c.Check(vols[1].data[locator1[:32]].data, NotNil) + + locator = fooHash + "+3" + logs := executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}}}) + c.Check(logs, Matches, ".*error pulling data from remote servers.*Block not found.*locator=acbd.*\n") + + locator = fooHash + "+3" + logs = executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{"http://0.0.0.0:9/"}}}) + c.Check(logs, Matches, ".*error pulling data from remote servers.*connection refused.*locator=acbd.*\n") + + locator = newRemoteBlock("log error writing to local volume") + vols[0].blockWrite = func(context.Context, string, []byte) error { return errors.New("test error") } + vols[1].blockWrite = vols[0].blockWrite + logs = executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}}}) + c.Check(logs, Matches, ".*error writing data to zzzzz-nyw5e-.*error=\"test error\".*locator=.*\n") + vols[0].blockWrite = nil + vols[1].blockWrite = nil + + locator = newRemoteBlock("log error when destination mount does not exist") + logs = executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}, + MountUUID: "bogus-mount-uuid"}}) + c.Check(logs, Matches, ".*ignoring pull list entry for nonexistent mount bogus-mount-uuid.*locator=.*\n") + + logs = executePullList([]PullListItem{}) + c.Logf("%s", logs) }