X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/753d0ef652e1593c0fbd204a3973554794215c2f..edcdf0ae72c56bf4aa05f93ed2877faa3a5e75c4:/services/keepstore/pull_worker_test.go diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go index 8e6241ff82..d109b56df3 100644 --- a/services/keepstore/pull_worker_test.go +++ b/services/keepstore/pull_worker_test.go @@ -1,134 +1,136 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package keepstore import ( "bytes" + "context" + "crypto/md5" + "encoding/json" "errors" + "fmt" "io" "net/http" - "testing" + "net/http/httptest" + "sort" "time" + + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadostest" + "github.com/sirupsen/logrus" + . "gopkg.in/check.v1" ) -func TestPullWorker(t *testing.T) { - defer teardown() +func (s *routerSuite) TestPullList_Execute(c *C) { + remotecluster := testCluster(c) + remotecluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-rrrrrrrrrrrrrrr": {Replication: 1, Driver: "stub"}, + } + remoterouter, cancel := testRouter(c, remotecluster, nil) + defer cancel() + remoteserver := httptest.NewServer(remoterouter) + defer remoteserver.Close() - // Since keepstore does not come into picture in tests, - // we need to explicitly start the goroutine in tests. - go RunPullWorker(pullq.NextItem) + router, cancel := testRouter(c, s.cluster, nil) + defer cancel() - data_manager_token = "DATA MANAGER TOKEN" + executePullList := func(pullList []PullListItem) string { + var logbuf bytes.Buffer + logger := logrus.New() + logger.Out = &logbuf + router.keepstore.logger = logger - first_pull_list := []byte(`[ - { - "locator":"locator1_to_verify_first_pull_list", - "servers":[ - "server_1", - "server_2" - ] - }, - { - "locator":"locator2_to_verify_first_pull_list", - "servers":[ - "server_3" - ] + listjson, err := json.Marshal(pullList) + c.Assert(err, IsNil) + resp := call(router, "PUT", "http://example/pull", s.cluster.SystemRootToken, listjson, nil) + c.Check(resp.Code, Equals, http.StatusOK) + for { + router.puller.cond.L.Lock() + todolen := len(router.puller.todo) + router.puller.cond.L.Unlock() + if todolen == 0 && router.puller.inprogress.Load() == 0 { + break + } + time.Sleep(time.Millisecond) } - ]`) + return logbuf.String() + } - second_pull_list := []byte(`[ - { - "locator":"locator_to_verify_second_pull_list", - "servers":[ - "server_1", - "server_2" - ] - } - ]`) - - type PullWorkerTestData struct { - name string - req RequestTester - response_code int - response_body string - read_content string - read_error bool - put_error bool + newRemoteBlock := func(datastring string) string { + data := []byte(datastring) + hash := fmt.Sprintf("%x", md5.Sum(data)) + locator := fmt.Sprintf("%s+%d", hash, len(data)) + _, err := remoterouter.keepstore.BlockWrite(context.Background(), arvados.BlockWriteOptions{ + Hash: hash, + Data: data, + }) + c.Assert(err, IsNil) + return locator } - var testcases = []PullWorkerTestData{ - { - "Pull request 1 from the data manager in worker", - RequestTester{"/pull", data_manager_token, "PUT", first_pull_list}, - http.StatusOK, - "Received 2 pull requests\n", - "hello", - false, - false, - }, - { - "Pull request 2 from the data manager in worker", - RequestTester{"/pull", data_manager_token, "PUT", second_pull_list}, - http.StatusOK, - "Received 1 pull requests\n", - "hola", - false, - false, - }, - { - "Pull request with error on get", - RequestTester{"/pull", data_manager_token, "PUT", second_pull_list}, - http.StatusOK, - "Received 1 pull requests\n", - "unused", - true, - false, - }, - { - "Pull request with error on put", - RequestTester{"/pull", data_manager_token, "PUT", second_pull_list}, - http.StatusOK, - "Received 1 pull requests\n", - "unused", - false, - true, - }, + + mounts := append([]*mount(nil), router.keepstore.mountsR...) + sort.Slice(mounts, func(i, j int) bool { return mounts[i].UUID < mounts[j].UUID }) + var vols []*stubVolume + for _, mount := range mounts { + vols = append(vols, mount.volume.(*stubVolume)) } - for _, testData := range testcases { - // Override GetContent to mock keepclient functionality - GetContent = func(signedLocator string) (reader io.ReadCloser, contentLength int64, url string, err error) { - if testData.read_error { - return nil, 0, "", errors.New("Error getting data") - } else { - cb := &ClosingBuffer{bytes.NewBufferString("Hi!")} - var rc io.ReadCloser - rc = cb - return rc, 3, "", nil - } - } + ctx := authContext(arvadostest.ActiveTokenV2) - // Override PutContent to mock PutBlock functionality - PutContent = func(content []byte, locator string) (err error) { - if testData.put_error { - return errors.New("Error putting data") - } else { - return nil - } - } + locator := newRemoteBlock("pull available block to unspecified volume") + executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}}}) + _, err := router.keepstore.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: router.keepstore.signLocator(arvadostest.ActiveTokenV2, locator), + WriteTo: io.Discard}) + c.Check(err, IsNil) - response := IssueRequest(&testData.req) - ExpectStatusCode(t, testData.name, testData.response_code, response) - ExpectBody(t, testData.name, testData.response_body, response) + locator0 := newRemoteBlock("pull available block to specified volume 0") + locator1 := newRemoteBlock("pull available block to specified volume 1") + executePullList([]PullListItem{ + { + Locator: locator0, + Servers: []string{remoteserver.URL}, + MountUUID: vols[0].params.UUID}, + { + Locator: locator1, + Servers: []string{remoteserver.URL}, + MountUUID: vols[1].params.UUID}}) + c.Check(vols[0].data[locator0[:32]].data, NotNil) + c.Check(vols[1].data[locator1[:32]].data, NotNil) - // give the channel a second to read and process all pull list entries - time.Sleep(1000 * time.Millisecond) + locator = fooHash + "+3" + logs := executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}}}) + c.Check(logs, Matches, ".*error pulling data from remote servers.*Block not found.*locator=acbd.*\n") - expectChannelEmpty(t, pullq.NextItem) - } -} + locator = fooHash + "+3" + logs = executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{"http://0.0.0.0:9/"}}}) + c.Check(logs, Matches, ".*error pulling data from remote servers.*connection refused.*locator=acbd.*\n") -type ClosingBuffer struct { - *bytes.Buffer -} + locator = newRemoteBlock("log error writing to local volume") + vols[0].blockWrite = func(context.Context, string, []byte) error { return errors.New("test error") } + vols[1].blockWrite = vols[0].blockWrite + logs = executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}}}) + c.Check(logs, Matches, ".*error writing data to zzzzz-nyw5e-.*error=\"test error\".*locator=.*\n") + vols[0].blockWrite = nil + vols[1].blockWrite = nil + + locator = newRemoteBlock("log error when destination mount does not exist") + logs = executePullList([]PullListItem{{ + Locator: locator, + Servers: []string{remoteserver.URL}, + MountUUID: "bogus-mount-uuid"}}) + c.Check(logs, Matches, ".*ignoring pull list entry for nonexistent mount bogus-mount-uuid.*locator=.*\n") -func (cb *ClosingBuffer) Close() (err error) { - return + logs = executePullList([]PullListItem{}) + c.Logf("%s", logs) }