1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.arvados.org/arvados.git/sdk/go/arvados"
17 "git.arvados.org/arvados.git/sdk/go/keepclient"
18 "github.com/prometheus/client_golang/prometheus"
20 check "gopkg.in/check.v1"
23 var _ = Suite(&PullWorkerTestSuite{})
25 type PullWorkerTestSuite struct {
26 cluster *arvados.Cluster
29 testPullLists map[string]string
36 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
37 s.cluster = testCluster(c)
38 s.cluster.Volumes = map[string]arvados.Volume{
39 "zzzzz-nyw5e-000000000000000": {Driver: "mock"},
40 "zzzzz-nyw5e-111111111111111": {Driver: "mock"},
42 s.cluster.Collections.BlobReplicateConcurrency = 1
44 s.handler = &handler{}
45 c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
49 s.putContent = []byte{}
52 // When a new pull request arrives, the old one will be overwritten.
53 // This behavior is verified using these two maps in the
54 // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
55 s.testPullLists = make(map[string]string)
58 var firstPullList = []byte(`[
60 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
66 "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
73 var secondPullList = []byte(`[
75 "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
83 type PullWorkerTestData struct {
93 // Ensure MountUUID in a pull list is correctly translated to a Volume
94 // argument passed to writePulledBlock().
95 func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
96 defer func(f func(*RRVolumeManager, Volume, []byte, string) error) {
99 pullq := s.handler.Handler.(*router).pullq
101 for _, spec := range []struct {
110 sendUUID: s.handler.volmgr.Mounts()[0].UUID,
111 expectVolume: s.handler.volmgr.Mounts()[0].Volume,
114 writePulledBlock = func(_ *RRVolumeManager, v Volume, _ []byte, _ string) error {
115 c.Check(v, Equals, spec.expectVolume)
119 resp := IssueRequest(s.handler, &RequestTester{
121 apiToken: s.cluster.SystemRootToken,
123 requestBody: []byte(`[{
124 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
125 "servers":["server_1","server_2"],
126 "mount_uuid":"` + spec.sendUUID + `"}]`),
128 c.Assert(resp.Code, Equals, http.StatusOK)
129 expectEqualWithin(c, time.Second, 0, func() interface{} {
131 return st.InProgress + st.Queued
136 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
137 testData := PullWorkerTestData{
138 name: "TestPullWorkerPullList_with_two_locators",
139 req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
140 responseCode: http.StatusOK,
141 responseBody: "Received 2 pull requests\n",
142 readContent: "hello",
147 s.performTest(testData, c)
150 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
151 testData := PullWorkerTestData{
152 name: "TestPullWorkerPullList_with_one_locator",
153 req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
154 responseCode: http.StatusOK,
155 responseBody: "Received 1 pull requests\n",
161 s.performTest(testData, c)
164 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
165 testData := PullWorkerTestData{
166 name: "TestPullWorker_error_on_get_one_locator",
167 req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
168 responseCode: http.StatusOK,
169 responseBody: "Received 1 pull requests\n",
170 readContent: "unused",
175 s.performTest(testData, c)
178 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
179 testData := PullWorkerTestData{
180 name: "TestPullWorker_error_on_get_two_locators",
181 req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
182 responseCode: http.StatusOK,
183 responseBody: "Received 2 pull requests\n",
184 readContent: "unused",
189 s.performTest(testData, c)
192 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
193 testData := PullWorkerTestData{
194 name: "TestPullWorker_error_on_put_one_locator",
195 req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
196 responseCode: http.StatusOK,
197 responseBody: "Received 1 pull requests\n",
198 readContent: "hello hello",
203 s.performTest(testData, c)
206 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
207 testData := PullWorkerTestData{
208 name: "TestPullWorker_error_on_put_two_locators",
209 req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
210 responseCode: http.StatusOK,
211 responseBody: "Received 2 pull requests\n",
212 readContent: "hello again",
217 s.performTest(testData, c)
220 // In this case, the item will not be placed on pullq
221 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
222 testData := PullWorkerTestData{
223 name: "TestPullWorkerPullList_with_two_locators",
224 req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
225 responseCode: http.StatusUnauthorized,
226 responseBody: "Unauthorized\n",
227 readContent: "hello",
232 s.performTest(testData, c)
235 func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
236 pullq := s.handler.Handler.(*router).pullq
238 s.testPullLists[testData.name] = testData.responseBody
240 processedPullLists := make(map[string]string)
242 // Override GetContent to mock keepclient Get functionality
243 defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
246 GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
247 c.Assert(getStatusItem(s.handler, "PullQueue", "InProgress"), Equals, float64(1))
248 processedPullLists[testData.name] = testData.responseBody
249 if testData.readError {
250 err = errors.New("Error getting data")
254 s.readContent = testData.readContent
255 reader = ioutil.NopCloser(bytes.NewBufferString(testData.readContent))
256 contentLength = int64(len(testData.readContent))
260 // Override writePulledBlock to mock PutBlock functionality
261 defer func(orig func(*RRVolumeManager, Volume, []byte, string) error) { writePulledBlock = orig }(writePulledBlock)
262 writePulledBlock = func(_ *RRVolumeManager, v Volume, content []byte, locator string) error {
263 if testData.putError {
264 s.putError = errors.New("Error putting data")
267 s.putContent = content
271 c.Check(getStatusItem(s.handler, "PullQueue", "InProgress"), Equals, float64(0))
272 c.Check(getStatusItem(s.handler, "PullQueue", "Queued"), Equals, float64(0))
273 c.Check(getStatusItem(s.handler, "Version"), Not(Equals), "")
275 response := IssueRequest(s.handler, &testData.req)
276 c.Assert(response.Code, Equals, testData.responseCode)
277 c.Assert(response.Body.String(), Equals, testData.responseBody)
279 expectEqualWithin(c, time.Second, 0, func() interface{} {
281 return st.InProgress + st.Queued
284 if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
285 c.Assert(len(s.testPullLists), Equals, 2)
286 c.Assert(len(processedPullLists), Equals, 1)
287 c.Assert(s.testPullLists["Added_before_actual_test_item"], NotNil)
288 c.Assert(s.testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
289 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
291 if testData.responseCode == http.StatusOK {
292 c.Assert(len(s.testPullLists), Equals, 1)
293 c.Assert(len(processedPullLists), Equals, 1)
294 c.Assert(s.testPullLists[testData.name], NotNil)
296 c.Assert(len(s.testPullLists), Equals, 1)
297 c.Assert(len(processedPullLists), Equals, 0)
301 if testData.readError {
302 c.Assert(s.readError, NotNil)
303 } else if testData.responseCode == http.StatusOK {
304 c.Assert(s.readError, IsNil)
305 c.Assert(s.readContent, Equals, testData.readContent)
306 if testData.putError {
307 c.Assert(s.putError, NotNil)
309 c.Assert(s.putError, IsNil)
310 c.Assert(string(s.putContent), Equals, testData.readContent)
314 expectChannelEmpty(c, pullq.NextItem)