6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
15 type PullWorkerTestSuite struct{}
17 // Gocheck boilerplate
18 func TestPullWorker(t *testing.T) {
22 // Gocheck boilerplate
23 var _ = Suite(&PullWorkerTestSuite{})
25 var testPullLists map[string]string
26 var readContent string
30 var currentTestData PullWorkerTestData
32 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
35 putContent = []byte("")
38 // When a new pull request arrives, the old one will be overwritten.
39 // This behavior is verified using these two maps in the
40 // "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
41 testPullLists = make(map[string]string)
44 // Since keepstore does not come into picture in tests,
45 // we need to explicitly start the goroutine in tests.
46 func RunTestPullWorker(c *C) {
47 arv, err := arvadosclient.MakeArvadosClient()
48 c.Assert(err, Equals, nil)
49 keepClient, err := keepclient.MakeKeepClient(&arv)
50 c.Assert(err, Equals, nil)
52 pullq = NewWorkQueue()
53 go RunPullWorker(pullq, keepClient)
56 var first_pull_list = []byte(`[
58 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
64 "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
71 var second_pull_list = []byte(`[
73 "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
81 type PullWorkerTestData struct {
91 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
94 data_manager_token = "DATA MANAGER TOKEN"
95 testData := PullWorkerTestData{
96 name: "TestPullWorker_pull_list_with_two_locators",
97 req: RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
98 response_code: http.StatusOK,
99 response_body: "Received 2 pull requests\n",
100 read_content: "hello",
105 performTest(testData, c)
108 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
111 data_manager_token = "DATA MANAGER TOKEN"
112 testData := PullWorkerTestData{
113 name: "TestPullWorker_pull_list_with_one_locator",
114 req: RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
115 response_code: http.StatusOK,
116 response_body: "Received 1 pull requests\n",
117 read_content: "hola",
122 performTest(testData, c)
125 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
128 data_manager_token = "DATA MANAGER TOKEN"
129 testData := PullWorkerTestData{
130 name: "TestPullWorker_error_on_get_one_locator",
131 req: RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
132 response_code: http.StatusOK,
133 response_body: "Received 1 pull requests\n",
134 read_content: "unused",
139 performTest(testData, c)
142 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
145 data_manager_token = "DATA MANAGER TOKEN"
146 testData := PullWorkerTestData{
147 name: "TestPullWorker_error_on_get_two_locators",
148 req: RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
149 response_code: http.StatusOK,
150 response_body: "Received 2 pull requests\n",
151 read_content: "unused",
156 performTest(testData, c)
159 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
162 data_manager_token = "DATA MANAGER TOKEN"
163 testData := PullWorkerTestData{
164 name: "TestPullWorker_error_on_put_one_locator",
165 req: RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
166 response_code: http.StatusOK,
167 response_body: "Received 1 pull requests\n",
168 read_content: "hello hello",
173 performTest(testData, c)
176 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
179 data_manager_token = "DATA MANAGER TOKEN"
180 testData := PullWorkerTestData{
181 name: "TestPullWorker_error_on_put_two_locators",
182 req: RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
183 response_code: http.StatusOK,
184 response_body: "Received 2 pull requests\n",
185 read_content: "hello again",
190 performTest(testData, c)
193 // When a new pull request arrives, the old one is replaced. This test
194 // is used to check that behavior by first putting an item on the queue,
195 // and then performing the test. Thus the "testPullLists" has two entries;
196 // however, processedPullLists will see only the newest item in the list.
197 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_items_latest_replacing_old(c *C) {
200 var firstInput = []int{1}
201 pullq = NewWorkQueue()
202 pullq.ReplaceQueue(makeTestWorkList(firstInput))
203 testPullLists["Added_before_actual_test_item"] = string(1)
205 data_manager_token = "DATA MANAGER TOKEN"
206 testData := PullWorkerTestData{
207 name: "TestPullWorker_pull_list_with_two_items_latest_replacing_old",
208 req: RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
209 response_code: http.StatusOK,
210 response_body: "Received 1 pull requests\n",
211 read_content: "hola de nuevo",
216 performTest(testData, c)
219 // In this case, the item will not be placed on pullq
220 func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) {
223 data_manager_token = "DATA MANAGER TOKEN"
225 testData := PullWorkerTestData{
226 name: "TestPullWorker_pull_list_with_two_locators",
227 req: RequestTester{"/pull", "invalid_data_manager_token", "PUT", first_pull_list},
228 response_code: http.StatusUnauthorized,
229 response_body: "Unauthorized\n",
230 read_content: "hello",
235 performTest(testData, c)
238 func performTest(testData PullWorkerTestData, c *C) {
239 KeepVM = MakeTestVolumeManager(2)
245 currentTestData = testData
246 testPullLists[testData.name] = testData.response_body
248 processedPullLists := make(map[string]string)
250 // Override GetContent to mock keepclient Get functionality
251 defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
254 GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
255 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
256 processedPullLists[testData.name] = testData.response_body
257 if testData.read_error {
258 err = errors.New("Error getting data")
260 return nil, 0, "", err
262 readContent = testData.read_content
263 cb := &ClosingBuffer{bytes.NewBufferString(testData.read_content)}
266 return rc, int64(len(testData.read_content)), "", nil
270 // Override PutContent to mock PutBlock functionality
271 defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
272 PutContent = func(content []byte, locator string) (err error) {
273 if testData.put_error {
274 err = errors.New("Error putting data")
283 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
284 c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
286 response := IssueRequest(&testData.req)
287 c.Assert(response.Code, Equals, testData.response_code)
288 c.Assert(response.Body.String(), Equals, testData.response_body)
290 expectEqualWithin(c, time.Second, 0, func() interface{} {
292 return st.InProgress + st.Queued
295 if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
296 c.Assert(len(testPullLists), Equals, 2)
297 c.Assert(len(processedPullLists), Equals, 1)
298 c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
299 c.Assert(testPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
300 c.Assert(processedPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
302 if testData.response_code == http.StatusOK {
303 c.Assert(len(testPullLists), Equals, 1)
304 c.Assert(len(processedPullLists), Equals, 1)
305 c.Assert(testPullLists[testData.name], NotNil)
307 c.Assert(len(testPullLists), Equals, 1)
308 c.Assert(len(processedPullLists), Equals, 0)
312 if testData.read_error {
313 c.Assert(readError, NotNil)
314 } else if testData.response_code == http.StatusOK {
315 c.Assert(readError, IsNil)
316 c.Assert(readContent, Equals, testData.read_content)
317 if testData.put_error {
318 c.Assert(putError, NotNil)
320 c.Assert(putError, IsNil)
321 c.Assert(string(putContent), Equals, testData.read_content)
325 expectChannelEmpty(c, pullq.NextItem)
328 type ClosingBuffer struct {
332 func (cb *ClosingBuffer) Close() (err error) {