6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
14 var _ = Suite(&PullWorkerTestSuite{})
16 type PullWorkerTestSuite struct{}
18 var testPullLists map[string]string
19 var readContent string
23 var currentTestData PullWorkerTestData
25 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
28 putContent = []byte("")
31 // When a new pull request arrives, the old one will be overwritten.
32 // This behavior is verified using these two maps in the
33 // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
34 testPullLists = make(map[string]string)
37 // Since keepstore does not come into picture in tests,
38 // we need to explicitly start the goroutine in tests.
39 func RunTestPullWorker(c *C) {
40 arv, err := arvadosclient.MakeArvadosClient()
41 c.Assert(err, Equals, nil)
42 keepClient, err := keepclient.MakeKeepClient(arv)
43 c.Assert(err, Equals, nil)
45 pullq = NewWorkQueue()
46 go RunPullWorker(pullq, keepClient)
49 var firstPullList = []byte(`[
51 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
57 "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
64 var secondPullList = []byte(`[
66 "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
74 type PullWorkerTestData struct {
84 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
87 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
88 testData := PullWorkerTestData{
89 name: "TestPullWorkerPullList_with_two_locators",
90 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
91 responseCode: http.StatusOK,
92 responseBody: "Received 2 pull requests\n",
98 performTest(testData, c)
101 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
104 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
105 testData := PullWorkerTestData{
106 name: "TestPullWorkerPullList_with_one_locator",
107 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
108 responseCode: http.StatusOK,
109 responseBody: "Received 1 pull requests\n",
115 performTest(testData, c)
118 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
121 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
122 testData := PullWorkerTestData{
123 name: "TestPullWorker_error_on_get_one_locator",
124 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
125 responseCode: http.StatusOK,
126 responseBody: "Received 1 pull requests\n",
127 readContent: "unused",
132 performTest(testData, c)
135 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
138 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
139 testData := PullWorkerTestData{
140 name: "TestPullWorker_error_on_get_two_locators",
141 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
142 responseCode: http.StatusOK,
143 responseBody: "Received 2 pull requests\n",
144 readContent: "unused",
149 performTest(testData, c)
152 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
155 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
156 testData := PullWorkerTestData{
157 name: "TestPullWorker_error_on_put_one_locator",
158 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
159 responseCode: http.StatusOK,
160 responseBody: "Received 1 pull requests\n",
161 readContent: "hello hello",
166 performTest(testData, c)
169 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
172 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
173 testData := PullWorkerTestData{
174 name: "TestPullWorker_error_on_put_two_locators",
175 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
176 responseCode: http.StatusOK,
177 responseBody: "Received 2 pull requests\n",
178 readContent: "hello again",
183 performTest(testData, c)
186 // When a new pull request arrives, the old one is replaced. This test
187 // is used to check that behavior by first putting an item on the queue,
188 // and then performing the test. Thus the "testPullLists" has two entries;
189 // however, processedPullLists will see only the newest item in the list.
190 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_replacing_old(c *C) {
193 var firstInput = []int{1}
194 pullq = NewWorkQueue()
195 pullq.ReplaceQueue(makeTestWorkList(firstInput))
196 testPullLists["Added_before_actual_test_item"] = string(1)
198 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
199 testData := PullWorkerTestData{
200 name: "TestPullWorkerPullList_with_two_items_latest_replacing_old",
201 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
202 responseCode: http.StatusOK,
203 responseBody: "Received 1 pull requests\n",
204 readContent: "hola de nuevo",
209 performTest(testData, c)
212 // In this case, the item will not be placed on pullq
213 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
216 theConfig.systemAuthToken = "DATA MANAGER TOKEN"
218 testData := PullWorkerTestData{
219 name: "TestPullWorkerPullList_with_two_locators",
220 req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
221 responseCode: http.StatusUnauthorized,
222 responseBody: "Unauthorized\n",
223 readContent: "hello",
228 performTest(testData, c)
231 func performTest(testData PullWorkerTestData, c *C) {
232 KeepVM = MakeTestVolumeManager(2)
238 currentTestData = testData
239 testPullLists[testData.name] = testData.responseBody
241 processedPullLists := make(map[string]string)
243 // Override GetContent to mock keepclient Get functionality
244 defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
247 GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
248 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
249 processedPullLists[testData.name] = testData.responseBody
250 if testData.readError {
251 err = errors.New("Error getting data")
253 return nil, 0, "", err
255 readContent = testData.readContent
256 cb := &ClosingBuffer{bytes.NewBufferString(testData.readContent)}
259 return rc, int64(len(testData.readContent)), "", nil
262 // Override PutContent to mock PutBlock functionality
263 defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
264 PutContent = func(content []byte, locator string) (err error) {
265 if testData.putError {
266 err = errors.New("Error putting data")
274 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
275 c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
277 response := IssueRequest(&testData.req)
278 c.Assert(response.Code, Equals, testData.responseCode)
279 c.Assert(response.Body.String(), Equals, testData.responseBody)
281 expectEqualWithin(c, time.Second, 0, func() interface{} {
283 return st.InProgress + st.Queued
286 if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
287 c.Assert(len(testPullLists), Equals, 2)
288 c.Assert(len(processedPullLists), Equals, 1)
289 c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
290 c.Assert(testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
291 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
293 if testData.responseCode == http.StatusOK {
294 c.Assert(len(testPullLists), Equals, 1)
295 c.Assert(len(processedPullLists), Equals, 1)
296 c.Assert(testPullLists[testData.name], NotNil)
298 c.Assert(len(testPullLists), Equals, 1)
299 c.Assert(len(processedPullLists), Equals, 0)
303 if testData.readError {
304 c.Assert(readError, NotNil)
305 } else if testData.responseCode == http.StatusOK {
306 c.Assert(readError, IsNil)
307 c.Assert(readContent, Equals, testData.readContent)
308 if testData.putError {
309 c.Assert(putError, NotNil)
311 c.Assert(putError, IsNil)
312 c.Assert(string(putContent), Equals, testData.readContent)
316 expectChannelEmpty(c, pullq.NextItem)
319 type ClosingBuffer struct {
323 func (cb *ClosingBuffer) Close() (err error) {