6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
16 var testPullLists map[string]string
17 var processedPullLists map[string]string
19 type PullWorkerTestSuite struct{}
21 // Gocheck boilerplate
22 func Test(t *testing.T) {
26 // Gocheck boilerplate
27 var _ = Suite(&PullWorkerTestSuite{})
29 func (s *PullWorkerTestSuite) SetUpSuite(c *C) {
30 // Since keepstore does not come into picture in tests,
31 // we need to explicitly start the goroutine in tests.
32 arv, err := arvadosclient.MakeArvadosClient()
33 c.Assert(err, Equals, nil)
34 keepClient, err := keepclient.MakeKeepClient(&arv)
35 c.Assert(err, Equals, nil)
36 go RunPullWorker(pullq, keepClient)
38 // When a new pull request arrives, the old one will be overwritten.
39 // This behavior is simulated with delay tests below.
40 testPullLists = make(map[string]string)
41 processedPullLists = make(map[string]string)
44 func (s *PullWorkerTestSuite) TearDownSuite(c *C) {
45 // give the channel some time to read and process all pull list entries
46 time.Sleep(1000 * time.Millisecond)
48 expectWorkerChannelEmpty(c, pullq.NextItem)
50 c.Assert(len(processedPullLists), Not(Equals), len(testPullLists))
53 var first_pull_list = []byte(`[
69 var second_pull_list = []byte(`[
79 type PullWorkerTestData struct {
89 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
92 data_manager_token = "DATA MANAGER TOKEN"
93 testData := PullWorkerTestData{
94 "TestPullWorker_pull_list_with_two_locators",
95 RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
97 "Received 2 pull requests\n",
103 performTest(testData, c)
106 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
109 data_manager_token = "DATA MANAGER TOKEN"
110 testData := PullWorkerTestData{
111 "TestPullWorker_pull_list_with_one_locator",
112 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
114 "Received 1 pull requests\n",
120 performTest(testData, c)
123 // When a new pull request arrives, the old one will be overwritten.
124 // Simulate this behavior by inducing delay in GetContent for the delay test(s).
125 // To ensure this delay test is not the last one executed and
126 // hence we cannot verify this behavior, let's run the delay test twice.
127 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator_with_delay_1(c *C) {
130 data_manager_token = "DATA MANAGER TOKEN"
131 testData := PullWorkerTestData{
132 "TestPullWorker_pull_list_with_one_locator_with_delay_1",
133 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
135 "Received 1 pull requests\n",
141 performTest(testData, c)
144 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator_with_delay_2(c *C) {
147 data_manager_token = "DATA MANAGER TOKEN"
148 testData := PullWorkerTestData{
149 "TestPullWorker_pull_list_with_one_locator_with_delay_2",
150 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
152 "Received 1 pull requests\n",
158 performTest(testData, c)
161 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
164 data_manager_token = "DATA MANAGER TOKEN"
165 testData := PullWorkerTestData{
166 "TestPullWorker_error_on_get_one_locator",
167 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
169 "Received 1 pull requests\n",
175 performTest(testData, c)
178 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
181 data_manager_token = "DATA MANAGER TOKEN"
182 testData := PullWorkerTestData{
183 "TestPullWorker_error_on_get_two_locators",
184 RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
186 "Received 2 pull requests\n",
192 performTest(testData, c)
195 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
198 data_manager_token = "DATA MANAGER TOKEN"
199 testData := PullWorkerTestData{
200 "TestPullWorker_error_on_put_one_locator",
201 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
203 "Received 1 pull requests\n",
209 performTest(testData, c)
212 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
215 data_manager_token = "DATA MANAGER TOKEN"
216 testData := PullWorkerTestData{
217 "TestPullWorker_error_on_put_two_locators",
218 RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
220 "Received 2 pull requests\n",
226 performTest(testData, c)
229 func performTest(testData PullWorkerTestData, c *C) {
230 testPullLists[testData.name] = testData.response_body
232 // We need to make sure the tests have a slight delay so that we can verify the pull list channel overwrites.
233 time.Sleep(25 * time.Millisecond)
235 // Override GetContent to mock keepclient Get functionality
236 GetContent = func(signedLocator string, keepClient keepclient.KeepClient) (
237 reader io.ReadCloser, contentLength int64, url string, err error) {
238 if strings.HasPrefix(testData.name, "TestPullWorker_pull_list_with_one_locator_with_delay") {
239 time.Sleep(100 * time.Millisecond)
242 processedPullLists[testData.name] = testData.response_body
243 if testData.read_error {
244 return nil, 0, "", errors.New("Error getting data")
246 cb := &ClosingBuffer{bytes.NewBufferString("Hi!")}
249 return rc, 3, "", nil
253 // Override PutContent to mock PutBlock functionality
254 PutContent = func(content []byte, locator string) (err error) {
255 if testData.put_error {
256 return errors.New("Error putting data")
262 response := IssueRequest(&testData.req)
263 c.Assert(testData.response_code, Equals, response.Code)
264 c.Assert(testData.response_body, Equals, response.Body.String())
267 type ClosingBuffer struct {
271 func (cb *ClosingBuffer) Close() (err error) {
275 func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
277 case item := <-workerChannel:
278 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)