10 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11 "git.curoverse.com/arvados.git/sdk/go/keepclient"
15 var _ = Suite(&PullWorkerTestSuite{})
17 type PullWorkerTestSuite struct{}
19 var testPullLists map[string]string
20 var readContent string
24 var currentTestData PullWorkerTestData
26 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
29 putContent = []byte("")
32 // When a new pull request arrives, the old one will be overwritten.
33 // This behavior is verified using these two maps in the
34 // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
35 testPullLists = make(map[string]string)
38 // Since keepstore does not come into picture in tests,
39 // we need to explicitly start the goroutine in tests.
40 func RunTestPullWorker(c *C) {
41 arv, err := arvadosclient.MakeArvadosClient()
42 c.Assert(err, Equals, nil)
43 keepClient, err := keepclient.MakeKeepClient(arv)
44 c.Assert(err, Equals, nil)
46 pullq = NewWorkQueue()
47 go RunPullWorker(pullq, keepClient)
50 var firstPullList = []byte(`[
52 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
58 "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
65 var secondPullList = []byte(`[
67 "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
75 type PullWorkerTestData struct {
85 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
88 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
89 testData := PullWorkerTestData{
90 name: "TestPullWorkerPullList_with_two_locators",
91 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
92 responseCode: http.StatusOK,
93 responseBody: "Received 2 pull requests\n",
99 performTest(testData, c)
102 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
105 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
106 testData := PullWorkerTestData{
107 name: "TestPullWorkerPullList_with_one_locator",
108 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
109 responseCode: http.StatusOK,
110 responseBody: "Received 1 pull requests\n",
116 performTest(testData, c)
119 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
122 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
123 testData := PullWorkerTestData{
124 name: "TestPullWorker_error_on_get_one_locator",
125 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
126 responseCode: http.StatusOK,
127 responseBody: "Received 1 pull requests\n",
128 readContent: "unused",
133 performTest(testData, c)
136 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
139 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
140 testData := PullWorkerTestData{
141 name: "TestPullWorker_error_on_get_two_locators",
142 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
143 responseCode: http.StatusOK,
144 responseBody: "Received 2 pull requests\n",
145 readContent: "unused",
150 performTest(testData, c)
153 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
156 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
157 testData := PullWorkerTestData{
158 name: "TestPullWorker_error_on_put_one_locator",
159 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
160 responseCode: http.StatusOK,
161 responseBody: "Received 1 pull requests\n",
162 readContent: "hello hello",
167 performTest(testData, c)
170 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
173 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
174 testData := PullWorkerTestData{
175 name: "TestPullWorker_error_on_put_two_locators",
176 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
177 responseCode: http.StatusOK,
178 responseBody: "Received 2 pull requests\n",
179 readContent: "hello again",
184 performTest(testData, c)
187 // When a new pull request arrives, the old one is replaced. This test
188 // is used to check that behavior by first putting an item on the queue,
189 // and then performing the test. Thus the "testPullLists" has two entries;
190 // however, processedPullLists will see only the newest item in the list.
191 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_replacing_old(c *C) {
194 var firstInput = []int{1}
195 pullq = NewWorkQueue()
196 pullq.ReplaceQueue(makeTestWorkList(firstInput))
197 testPullLists["Added_before_actual_test_item"] = string(1)
199 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
200 testData := PullWorkerTestData{
201 name: "TestPullWorkerPullList_with_two_items_latest_replacing_old",
202 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
203 responseCode: http.StatusOK,
204 responseBody: "Received 1 pull requests\n",
205 readContent: "hola de nuevo",
210 performTest(testData, c)
213 // In this case, the item will not be placed on pullq
214 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
217 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
219 testData := PullWorkerTestData{
220 name: "TestPullWorkerPullList_with_two_locators",
221 req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
222 responseCode: http.StatusUnauthorized,
223 responseBody: "Unauthorized\n",
224 readContent: "hello",
229 performTest(testData, c)
232 func performTest(testData PullWorkerTestData, c *C) {
233 KeepVM = MakeTestVolumeManager(2)
239 currentTestData = testData
240 testPullLists[testData.name] = testData.responseBody
242 processedPullLists := make(map[string]string)
244 // Override GetContent to mock keepclient Get functionality
245 defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
248 GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
249 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
250 processedPullLists[testData.name] = testData.responseBody
251 if testData.readError {
252 err = errors.New("Error getting data")
254 return nil, 0, "", err
256 readContent = testData.readContent
257 cb := &ClosingBuffer{bytes.NewBufferString(testData.readContent)}
260 return rc, int64(len(testData.readContent)), "", nil
263 // Override writePulledBlock to mock PutBlock functionality
264 defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
265 writePulledBlock = func(v Volume, content []byte, locator string) {
266 if testData.putError {
267 putError = errors.New("Error putting data")
273 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
274 c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
276 response := IssueRequest(&testData.req)
277 c.Assert(response.Code, Equals, testData.responseCode)
278 c.Assert(response.Body.String(), Equals, testData.responseBody)
280 expectEqualWithin(c, time.Second, 0, func() interface{} {
282 return st.InProgress + st.Queued
285 if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
286 c.Assert(len(testPullLists), Equals, 2)
287 c.Assert(len(processedPullLists), Equals, 1)
288 c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
289 c.Assert(testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
290 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
292 if testData.responseCode == http.StatusOK {
293 c.Assert(len(testPullLists), Equals, 1)
294 c.Assert(len(processedPullLists), Equals, 1)
295 c.Assert(testPullLists[testData.name], NotNil)
297 c.Assert(len(testPullLists), Equals, 1)
298 c.Assert(len(processedPullLists), Equals, 0)
302 if testData.readError {
303 c.Assert(readError, NotNil)
304 } else if testData.responseCode == http.StatusOK {
305 c.Assert(readError, IsNil)
306 c.Assert(readContent, Equals, testData.readContent)
307 if testData.putError {
308 c.Assert(putError, NotNil)
310 c.Assert(putError, IsNil)
311 c.Assert(string(putContent), Equals, testData.readContent)
315 expectChannelEmpty(c, pullq.NextItem)
318 type ClosingBuffer struct {
322 func (cb *ClosingBuffer) Close() (err error) {