11 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
12 "git.curoverse.com/arvados.git/sdk/go/keepclient"
16 var _ = Suite(&PullWorkerTestSuite{})
18 type PullWorkerTestSuite struct {
19 testPullLists map[string]string
26 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
27 theConfig.systemAuthToken = "arbitrary data manager token"
30 s.putContent = []byte{}
33 // When a new pull request arrives, the old one will be overwritten.
34 // This behavior is verified using these two maps in the
35 // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
36 s.testPullLists = make(map[string]string)
38 KeepVM = MakeTestVolumeManager(2)
40 // Normally the pull queue and workers are started by main()
41 // -- tests need to set up their own.
42 arv, err := arvadosclient.MakeArvadosClient()
44 keepClient, err := keepclient.MakeKeepClient(arv)
46 pullq = NewWorkQueue()
47 go RunPullWorker(pullq, keepClient)
50 func (s *PullWorkerTestSuite) TearDownTest(c *C) {
56 theConfig = DefaultConfig()
60 var firstPullList = []byte(`[
62 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
68 "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
75 var secondPullList = []byte(`[
77 "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
85 type PullWorkerTestData struct {
95 // Ensure MountUUID in a pull list is correctly translated to a Volume
96 // argument passed to writePulledBlock().
97 func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
98 defer func(f func(Volume, []byte, string)) {
102 for _, spec := range []struct {
111 sendUUID: KeepVM.Mounts()[0].UUID,
112 expectVolume: KeepVM.Mounts()[0].volume,
115 writePulledBlock = func(v Volume, _ []byte, _ string) {
116 c.Check(v, Equals, spec.expectVolume)
119 resp := IssueRequest(&RequestTester{
121 apiToken: theConfig.systemAuthToken,
123 requestBody: []byte(`[{
124 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
125 "servers":["server_1","server_2"],
126 "mountuuid":"` + spec.sendUUID + `"}]`),
128 c.Assert(resp.Code, Equals, http.StatusOK)
129 expectEqualWithin(c, time.Second, 0, func() interface{} {
131 return st.InProgress + st.Queued
136 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
137 testData := PullWorkerTestData{
138 name: "TestPullWorkerPullList_with_two_locators",
139 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
140 responseCode: http.StatusOK,
141 responseBody: "Received 2 pull requests\n",
142 readContent: "hello",
147 s.performTest(testData, c)
150 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
151 testData := PullWorkerTestData{
152 name: "TestPullWorkerPullList_with_one_locator",
153 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
154 responseCode: http.StatusOK,
155 responseBody: "Received 1 pull requests\n",
161 s.performTest(testData, c)
164 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
165 testData := PullWorkerTestData{
166 name: "TestPullWorker_error_on_get_one_locator",
167 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
168 responseCode: http.StatusOK,
169 responseBody: "Received 1 pull requests\n",
170 readContent: "unused",
175 s.performTest(testData, c)
178 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
179 testData := PullWorkerTestData{
180 name: "TestPullWorker_error_on_get_two_locators",
181 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
182 responseCode: http.StatusOK,
183 responseBody: "Received 2 pull requests\n",
184 readContent: "unused",
189 s.performTest(testData, c)
192 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
193 testData := PullWorkerTestData{
194 name: "TestPullWorker_error_on_put_one_locator",
195 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
196 responseCode: http.StatusOK,
197 responseBody: "Received 1 pull requests\n",
198 readContent: "hello hello",
203 s.performTest(testData, c)
206 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
207 testData := PullWorkerTestData{
208 name: "TestPullWorker_error_on_put_two_locators",
209 req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
210 responseCode: http.StatusOK,
211 responseBody: "Received 2 pull requests\n",
212 readContent: "hello again",
217 s.performTest(testData, c)
220 // In this case, the item will not be placed on pullq
221 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
222 testData := PullWorkerTestData{
223 name: "TestPullWorkerPullList_with_two_locators",
224 req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
225 responseCode: http.StatusUnauthorized,
226 responseBody: "Unauthorized\n",
227 readContent: "hello",
232 s.performTest(testData, c)
235 func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
236 s.testPullLists[testData.name] = testData.responseBody
238 processedPullLists := make(map[string]string)
240 // Override GetContent to mock keepclient Get functionality
241 defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
244 GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
245 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
246 processedPullLists[testData.name] = testData.responseBody
247 if testData.readError {
248 err = errors.New("Error getting data")
252 s.readContent = testData.readContent
253 reader = ioutil.NopCloser(bytes.NewBufferString(testData.readContent))
254 contentLength = int64(len(testData.readContent))
258 // Override writePulledBlock to mock PutBlock functionality
259 defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
260 writePulledBlock = func(v Volume, content []byte, locator string) {
261 if testData.putError {
262 s.putError = errors.New("Error putting data")
265 s.putContent = content
268 c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
269 c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
271 response := IssueRequest(&testData.req)
272 c.Assert(response.Code, Equals, testData.responseCode)
273 c.Assert(response.Body.String(), Equals, testData.responseBody)
275 expectEqualWithin(c, time.Second, 0, func() interface{} {
277 return st.InProgress + st.Queued
280 if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
281 c.Assert(len(s.testPullLists), Equals, 2)
282 c.Assert(len(processedPullLists), Equals, 1)
283 c.Assert(s.testPullLists["Added_before_actual_test_item"], NotNil)
284 c.Assert(s.testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
285 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
287 if testData.responseCode == http.StatusOK {
288 c.Assert(len(s.testPullLists), Equals, 1)
289 c.Assert(len(processedPullLists), Equals, 1)
290 c.Assert(s.testPullLists[testData.name], NotNil)
292 c.Assert(len(s.testPullLists), Equals, 1)
293 c.Assert(len(processedPullLists), Equals, 0)
297 if testData.readError {
298 c.Assert(s.readError, NotNil)
299 } else if testData.responseCode == http.StatusOK {
300 c.Assert(s.readError, IsNil)
301 c.Assert(s.readContent, Equals, testData.readContent)
302 if testData.putError {
303 c.Assert(s.putError, NotNil)
305 c.Assert(s.putError, IsNil)
306 c.Assert(string(s.putContent), Equals, testData.readContent)
310 expectChannelEmpty(c, pullq.NextItem)