1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
16 "git.curoverse.com/arvados.git/sdk/go/keepclient"
17 "github.com/prometheus/client_golang/prometheus"
21 var _ = Suite(&PullWorkerTestSuite{})
23 type PullWorkerTestSuite struct {
24 testPullLists map[string]string
31 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
32 theConfig.systemAuthToken = "arbitrary data manager token"
35 s.putContent = []byte{}
38 // When a new pull request arrives, the old one will be overwritten.
39 // This behavior is verified using these two maps in the
40 // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
41 s.testPullLists = make(map[string]string)
43 KeepVM = MakeTestVolumeManager(2)
45 // Normally the pull queue and workers are started by main()
46 // -- tests need to set up their own.
47 arv, err := arvadosclient.MakeArvadosClient()
49 keepClient, err := keepclient.MakeKeepClient(arv)
51 pullq = NewWorkQueue()
52 go RunPullWorker(pullq, keepClient)
55 func (s *PullWorkerTestSuite) TearDownTest(c *C) {
61 theConfig = DefaultConfig()
62 theConfig.Start(prometheus.NewRegistry())
65 var firstPullList = []byte(`[
67 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
73 "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
80 var secondPullList = []byte(`[
82 "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
90 type PullWorkerTestData struct {
100 // Ensure MountUUID in a pull list is correctly translated to a Volume
101 // argument passed to writePulledBlock().
102 func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
103 defer func(f func(Volume, []byte, string)) {
107 for _, spec := range []struct {
116 sendUUID: KeepVM.Mounts()[0].UUID,
117 expectVolume: KeepVM.Mounts()[0].volume,
120 writePulledBlock = func(v Volume, _ []byte, _ string) {
121 c.Check(v, Equals, spec.expectVolume)
124 resp := IssueRequest(&RequestTester{
126 apiToken: theConfig.systemAuthToken,
128 requestBody: []byte(`[{
129 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
130 "servers":["server_1","server_2"],
131 "mount_uuid":"` + spec.sendUUID + `"}]`),
133 c.Assert(resp.Code, Equals, http.StatusOK)
134 expectEqualWithin(c, time.Second, 0, func() interface{} {
136 return st.InProgress + st.Queued
141 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
142 testData := PullWorkerTestData{
143 name: "TestPullWorkerPullList_with_two_locators",
144 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
145 responseCode: http.StatusOK,
146 responseBody: "Received 2 pull requests\n",
147 readContent: "hello",
152 s.performTest(testData, c)
155 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
156 testData := PullWorkerTestData{
157 name: "TestPullWorkerPullList_with_one_locator",
158 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
159 responseCode: http.StatusOK,
160 responseBody: "Received 1 pull requests\n",
166 s.performTest(testData, c)
169 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
170 testData := PullWorkerTestData{
171 name: "TestPullWorker_error_on_get_one_locator",
172 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
173 responseCode: http.StatusOK,
174 responseBody: "Received 1 pull requests\n",
175 readContent: "unused",
180 s.performTest(testData, c)
183 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
184 testData := PullWorkerTestData{
185 name: "TestPullWorker_error_on_get_two_locators",
186 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
187 responseCode: http.StatusOK,
188 responseBody: "Received 2 pull requests\n",
189 readContent: "unused",
194 s.performTest(testData, c)
197 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
198 testData := PullWorkerTestData{
199 name: "TestPullWorker_error_on_put_one_locator",
200 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
201 responseCode: http.StatusOK,
202 responseBody: "Received 1 pull requests\n",
203 readContent: "hello hello",
208 s.performTest(testData, c)
211 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
212 testData := PullWorkerTestData{
213 name: "TestPullWorker_error_on_put_two_locators",
214 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
215 responseCode: http.StatusOK,
216 responseBody: "Received 2 pull requests\n",
217 readContent: "hello again",
222 s.performTest(testData, c)
225 // In this case, the item will not be placed on pullq
226 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
227 testData := PullWorkerTestData{
228 name: "TestPullWorkerPullList_with_two_locators",
229 req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
230 responseCode: http.StatusUnauthorized,
231 responseBody: "Unauthorized\n",
232 readContent: "hello",
237 s.performTest(testData, c)
240 func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
241 s.testPullLists[testData.name] = testData.responseBody
243 processedPullLists := make(map[string]string)
245 // Override GetContent to mock keepclient Get functionality
246 defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
249 GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
250 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
251 processedPullLists[testData.name] = testData.responseBody
252 if testData.readError {
253 err = errors.New("Error getting data")
257 s.readContent = testData.readContent
258 reader = ioutil.NopCloser(bytes.NewBufferString(testData.readContent))
259 contentLength = int64(len(testData.readContent))
263 // Override writePulledBlock to mock PutBlock functionality
264 defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
265 writePulledBlock = func(v Volume, content []byte, locator string) {
266 if testData.putError {
267 s.putError = errors.New("Error putting data")
270 s.putContent = content
273 c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
274 c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
275 c.Check(getStatusItem("Version"), Not(Equals), "")
277 response := IssueRequest(&testData.req)
278 c.Assert(response.Code, Equals, testData.responseCode)
279 c.Assert(response.Body.String(), Equals, testData.responseBody)
281 expectEqualWithin(c, time.Second, 0, func() interface{} {
283 return st.InProgress + st.Queued
286 if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
287 c.Assert(len(s.testPullLists), Equals, 2)
288 c.Assert(len(processedPullLists), Equals, 1)
289 c.Assert(s.testPullLists["Added_before_actual_test_item"], NotNil)
290 c.Assert(s.testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
291 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
293 if testData.responseCode == http.StatusOK {
294 c.Assert(len(s.testPullLists), Equals, 1)
295 c.Assert(len(processedPullLists), Equals, 1)
296 c.Assert(s.testPullLists[testData.name], NotNil)
298 c.Assert(len(s.testPullLists), Equals, 1)
299 c.Assert(len(processedPullLists), Equals, 0)
303 if testData.readError {
304 c.Assert(s.readError, NotNil)
305 } else if testData.responseCode == http.StatusOK {
306 c.Assert(s.readError, IsNil)
307 c.Assert(s.readContent, Equals, testData.readContent)
308 if testData.putError {
309 c.Assert(s.putError, NotNil)
311 c.Assert(s.putError, IsNil)
312 c.Assert(string(s.putContent), Equals, testData.readContent)
316 expectChannelEmpty(c, pullq.NextItem)