1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
16 "git.curoverse.com/arvados.git/sdk/go/keepclient"
20 var _ = Suite(&PullWorkerTestSuite{})
22 type PullWorkerTestSuite struct {
23 testPullLists map[string]string
30 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
31 theConfig.systemAuthToken = "arbitrary data manager token"
34 s.putContent = []byte{}
37 // When a new pull request arrives, the old one will be overwritten.
38 // This behavior is verified using these two maps in the
39 // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
40 s.testPullLists = make(map[string]string)
42 KeepVM = MakeTestVolumeManager(2)
44 // Normally the pull queue and workers are started by main()
45 // -- tests need to set up their own.
46 arv, err := arvadosclient.MakeArvadosClient()
48 keepClient, err := keepclient.MakeKeepClient(arv)
50 pullq = NewWorkQueue()
51 go RunPullWorker(pullq, keepClient)
54 func (s *PullWorkerTestSuite) TearDownTest(c *C) {
60 theConfig = DefaultConfig()
64 var firstPullList = []byte(`[
66 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
72 "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
79 var secondPullList = []byte(`[
81 "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
89 type PullWorkerTestData struct {
99 // Ensure MountUUID in a pull list is correctly translated to a Volume
100 // argument passed to writePulledBlock().
101 func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
102 defer func(f func(Volume, []byte, string)) {
106 for _, spec := range []struct {
115 sendUUID: KeepVM.Mounts()[0].UUID,
116 expectVolume: KeepVM.Mounts()[0].volume,
119 writePulledBlock = func(v Volume, _ []byte, _ string) {
120 c.Check(v, Equals, spec.expectVolume)
123 resp := IssueRequest(&RequestTester{
125 apiToken: theConfig.systemAuthToken,
127 requestBody: []byte(`[{
128 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
129 "servers":["server_1","server_2"],
130 "mountuuid":"` + spec.sendUUID + `"}]`),
132 c.Assert(resp.Code, Equals, http.StatusOK)
133 expectEqualWithin(c, time.Second, 0, func() interface{} {
135 return st.InProgress + st.Queued
140 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
141 testData := PullWorkerTestData{
142 name: "TestPullWorkerPullList_with_two_locators",
143 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
144 responseCode: http.StatusOK,
145 responseBody: "Received 2 pull requests\n",
146 readContent: "hello",
151 s.performTest(testData, c)
154 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
155 testData := PullWorkerTestData{
156 name: "TestPullWorkerPullList_with_one_locator",
157 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
158 responseCode: http.StatusOK,
159 responseBody: "Received 1 pull requests\n",
165 s.performTest(testData, c)
168 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
169 testData := PullWorkerTestData{
170 name: "TestPullWorker_error_on_get_one_locator",
171 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
172 responseCode: http.StatusOK,
173 responseBody: "Received 1 pull requests\n",
174 readContent: "unused",
179 s.performTest(testData, c)
182 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
183 testData := PullWorkerTestData{
184 name: "TestPullWorker_error_on_get_two_locators",
185 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
186 responseCode: http.StatusOK,
187 responseBody: "Received 2 pull requests\n",
188 readContent: "unused",
193 s.performTest(testData, c)
196 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
197 testData := PullWorkerTestData{
198 name: "TestPullWorker_error_on_put_one_locator",
199 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
200 responseCode: http.StatusOK,
201 responseBody: "Received 1 pull requests\n",
202 readContent: "hello hello",
207 s.performTest(testData, c)
210 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
211 testData := PullWorkerTestData{
212 name: "TestPullWorker_error_on_put_two_locators",
213 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
214 responseCode: http.StatusOK,
215 responseBody: "Received 2 pull requests\n",
216 readContent: "hello again",
221 s.performTest(testData, c)
224 // In this case, the item will not be placed on pullq
225 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
226 testData := PullWorkerTestData{
227 name: "TestPullWorkerPullList_with_two_locators",
228 req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
229 responseCode: http.StatusUnauthorized,
230 responseBody: "Unauthorized\n",
231 readContent: "hello",
236 s.performTest(testData, c)
239 func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
240 s.testPullLists[testData.name] = testData.responseBody
242 processedPullLists := make(map[string]string)
244 // Override GetContent to mock keepclient Get functionality
245 defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
248 GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
249 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
250 processedPullLists[testData.name] = testData.responseBody
251 if testData.readError {
252 err = errors.New("Error getting data")
256 s.readContent = testData.readContent
257 reader = ioutil.NopCloser(bytes.NewBufferString(testData.readContent))
258 contentLength = int64(len(testData.readContent))
262 // Override writePulledBlock to mock PutBlock functionality
263 defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
264 writePulledBlock = func(v Volume, content []byte, locator string) {
265 if testData.putError {
266 s.putError = errors.New("Error putting data")
269 s.putContent = content
272 c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
273 c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
275 response := IssueRequest(&testData.req)
276 c.Assert(response.Code, Equals, testData.responseCode)
277 c.Assert(response.Body.String(), Equals, testData.responseBody)
279 expectEqualWithin(c, time.Second, 0, func() interface{} {
281 return st.InProgress + st.Queued
284 if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
285 c.Assert(len(s.testPullLists), Equals, 2)
286 c.Assert(len(processedPullLists), Equals, 1)
287 c.Assert(s.testPullLists["Added_before_actual_test_item"], NotNil)
288 c.Assert(s.testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
289 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
291 if testData.responseCode == http.StatusOK {
292 c.Assert(len(s.testPullLists), Equals, 1)
293 c.Assert(len(processedPullLists), Equals, 1)
294 c.Assert(s.testPullLists[testData.name], NotNil)
296 c.Assert(len(s.testPullLists), Equals, 1)
297 c.Assert(len(processedPullLists), Equals, 0)
301 if testData.readError {
302 c.Assert(s.readError, NotNil)
303 } else if testData.responseCode == http.StatusOK {
304 c.Assert(s.readError, IsNil)
305 c.Assert(s.readContent, Equals, testData.readContent)
306 if testData.putError {
307 c.Assert(s.putError, NotNil)
309 c.Assert(s.putError, IsNil)
310 c.Assert(string(s.putContent), Equals, testData.readContent)
314 expectChannelEmpty(c, pullq.NextItem)