6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
15 type PullWorkerTestSuite struct{}
17 // Gocheck boilerplate
18 func TestPullWorker(t *testing.T) {
22 // Gocheck boilerplate
23 var _ = Suite(&PullWorkerTestSuite{})
25 var testPullLists map[string]string
26 var readContent string
30 var currentTestData PullWorkerTestData
32 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
35 putContent = []byte("")
38 // When a new pull request arrives, the old one will be overwritten.
39 // This behavior is verified using these two maps in the
40 // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
41 testPullLists = make(map[string]string)
44 // Since keepstore does not come into picture in tests,
45 // we need to explicitly start the goroutine in tests.
46 func RunTestPullWorker(c *C) {
47 arv, err := arvadosclient.MakeArvadosClient()
48 c.Assert(err, Equals, nil)
49 keepClient, err := keepclient.MakeKeepClient(&arv)
50 c.Assert(err, Equals, nil)
52 pullq = NewWorkQueue()
53 go RunPullWorker(pullq, keepClient)
56 var firstPullList = []byte(`[
58 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
64 "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
71 var secondPullList = []byte(`[
73 "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
81 type PullWorkerTestData struct {
91 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
94 dataManagerToken = "DATA MANAGER TOKEN"
95 testData := PullWorkerTestData{
96 name: "TestPullWorkerPullList_with_two_locators",
97 req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
98 responseCode: http.StatusOK,
99 responseBody: "Received 2 pull requests\n",
100 readContent: "hello",
105 performTest(testData, c)
108 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
111 dataManagerToken = "DATA MANAGER TOKEN"
112 testData := PullWorkerTestData{
113 name: "TestPullWorkerPullList_with_one_locator",
114 req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
115 responseCode: http.StatusOK,
116 responseBody: "Received 1 pull requests\n",
122 performTest(testData, c)
125 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
128 dataManagerToken = "DATA MANAGER TOKEN"
129 testData := PullWorkerTestData{
130 name: "TestPullWorker_error_on_get_one_locator",
131 req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
132 responseCode: http.StatusOK,
133 responseBody: "Received 1 pull requests\n",
134 readContent: "unused",
139 performTest(testData, c)
142 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
145 dataManagerToken = "DATA MANAGER TOKEN"
146 testData := PullWorkerTestData{
147 name: "TestPullWorker_error_on_get_two_locators",
148 req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
149 responseCode: http.StatusOK,
150 responseBody: "Received 2 pull requests\n",
151 readContent: "unused",
156 performTest(testData, c)
159 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
162 dataManagerToken = "DATA MANAGER TOKEN"
163 testData := PullWorkerTestData{
164 name: "TestPullWorker_error_on_put_one_locator",
165 req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
166 responseCode: http.StatusOK,
167 responseBody: "Received 1 pull requests\n",
168 readContent: "hello hello",
173 performTest(testData, c)
176 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
179 dataManagerToken = "DATA MANAGER TOKEN"
180 testData := PullWorkerTestData{
181 name: "TestPullWorker_error_on_put_two_locators",
182 req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
183 responseCode: http.StatusOK,
184 responseBody: "Received 2 pull requests\n",
185 readContent: "hello again",
190 performTest(testData, c)
193 // When a new pull request arrives, the old one is replaced. This test
194 // is used to check that behavior by first putting an item on the queue,
195 // and then performing the test. Thus the "testPullLists" has two entries;
196 // however, processedPullLists will see only the newest item in the list.
197 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_replacing_old(c *C) {
200 var firstInput = []int{1}
201 pullq = NewWorkQueue()
202 pullq.ReplaceQueue(makeTestWorkList(firstInput))
203 testPullLists["Added_before_actual_test_item"] = string(1)
205 dataManagerToken = "DATA MANAGER TOKEN"
206 testData := PullWorkerTestData{
207 name: "TestPullWorkerPullList_with_two_items_latest_replacing_old",
208 req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
209 responseCode: http.StatusOK,
210 responseBody: "Received 1 pull requests\n",
211 readContent: "hola de nuevo",
216 performTest(testData, c)
219 // In this case, the item will not be placed on pullq
220 func (s *PullWorkerTestSuite) TestPullWorker_invalid_dataManagerToken(c *C) {
223 dataManagerToken = "DATA MANAGER TOKEN"
225 testData := PullWorkerTestData{
226 name: "TestPullWorkerPullList_with_two_locators",
227 req: RequestTester{"/pull", "invalid_dataManagerToken", "PUT", firstPullList},
228 responseCode: http.StatusUnauthorized,
229 responseBody: "Unauthorized\n",
230 readContent: "hello",
235 performTest(testData, c)
238 func performTest(testData PullWorkerTestData, c *C) {
239 KeepVM = MakeTestVolumeManager(2)
245 currentTestData = testData
246 testPullLists[testData.name] = testData.responseBody
248 processedPullLists := make(map[string]string)
250 // Override GetContent to mock keepclient Get functionality
251 defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
254 GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
255 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
256 processedPullLists[testData.name] = testData.responseBody
257 if testData.readError {
258 err = errors.New("Error getting data")
260 return nil, 0, "", err
262 readContent = testData.readContent
263 cb := &ClosingBuffer{bytes.NewBufferString(testData.readContent)}
266 return rc, int64(len(testData.readContent)), "", nil
269 // Override PutContent to mock PutBlock functionality
270 defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
271 PutContent = func(content []byte, locator string) (err error) {
272 if testData.putError {
273 err = errors.New("Error putting data")
281 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
282 c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
284 response := IssueRequest(&testData.req)
285 c.Assert(response.Code, Equals, testData.responseCode)
286 c.Assert(response.Body.String(), Equals, testData.responseBody)
288 expectEqualWithin(c, time.Second, 0, func() interface{} {
290 return st.InProgress + st.Queued
293 if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
294 c.Assert(len(testPullLists), Equals, 2)
295 c.Assert(len(processedPullLists), Equals, 1)
296 c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
297 c.Assert(testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
298 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
300 if testData.responseCode == http.StatusOK {
301 c.Assert(len(testPullLists), Equals, 1)
302 c.Assert(len(processedPullLists), Equals, 1)
303 c.Assert(testPullLists[testData.name], NotNil)
305 c.Assert(len(testPullLists), Equals, 1)
306 c.Assert(len(processedPullLists), Equals, 0)
310 if testData.readError {
311 c.Assert(readError, NotNil)
312 } else if testData.responseCode == http.StatusOK {
313 c.Assert(readError, IsNil)
314 c.Assert(readContent, Equals, testData.readContent)
315 if testData.putError {
316 c.Assert(putError, NotNil)
318 c.Assert(putError, IsNil)
319 c.Assert(string(putContent), Equals, testData.readContent)
323 expectChannelEmpty(c, pullq.NextItem)
326 type ClosingBuffer struct {
330 func (cb *ClosingBuffer) Close() (err error) {