3761: Improve the tests to RunTestPullWorker for each test and verify the queue is...
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         . "gopkg.in/check.v1"
9         "io"
10         "net/http"
11         "testing"
12 )
13
14 type PullWorkerTestSuite struct{}
15
16 // Gocheck boilerplate
17 func TestPullWorker(t *testing.T) {
18         TestingT(t)
19 }
20
21 // Gocheck boilerplate
22 var _ = Suite(&PullWorkerTestSuite{})
23
24 var testPullLists map[string]string
25 var processedPullLists map[string]string
26 var readContent string
27 var readError error
28 var putContent []byte
29 var putError error
30 var currentTestData PullWorkerTestData
31
32 const READ_CONTENT = "Hi!"
33
34 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
35         readContent = ""
36         readError = nil
37         putContent = []byte("")
38         putError = nil
39
40         // When a new pull request arrives, the old one will be overwritten.
41         // This behavior is verified using these maps in the "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
42         testPullLists = make(map[string]string)
43         processedPullLists = make(map[string]string)
44 }
45
46 func (s *PullWorkerTestSuite) TearDownTest(c *C) {
47         expectWorkerChannelEmpty(c, pullq.NextItem)
48
49         if currentTestData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
50                 c.Assert(len(testPullLists), Equals, 2)
51                 c.Assert(len(processedPullLists), Equals, 1)
52                 c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
53                 c.Assert(testPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
54                 c.Assert(processedPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
55         }
56
57         if currentTestData.read_error {
58                 c.Assert(readError, NotNil)
59         } else {
60                 c.Assert(readError, IsNil)
61                 c.Assert(readContent, Equals, READ_CONTENT)
62                 if currentTestData.put_error {
63                         c.Assert(putError, NotNil)
64                 } else {
65                         c.Assert(putError, IsNil)
66                         c.Assert(string(putContent), Equals, READ_CONTENT)
67                 }
68         }
69 }
70
71 // Since keepstore does not come into picture in tests,
72 // we need to explicitly start the goroutine in tests.
73 func RunTestPullWorker(c *C) {
74         arv, err := arvadosclient.MakeArvadosClient()
75         c.Assert(err, Equals, nil)
76         keepClient, err := keepclient.MakeKeepClient(&arv)
77         c.Assert(err, Equals, nil)
78
79         pullq = NewWorkQueue()
80         go RunPullWorker(pullq, keepClient)
81 }
82
83 var first_pull_list = []byte(`[
84                 {
85                         "locator":"locator1",
86                         "servers":[
87                                 "server_1",
88                                 "server_2"
89                         ]
90                 },
91     {
92                         "locator":"locator2",
93                         "servers":[
94                                 "server_3"
95                         ]
96                 }
97         ]`)
98
99 var second_pull_list = []byte(`[
100                 {
101                         "locator":"locator3",
102                         "servers":[
103                                 "server_1",
104         "server_2"
105                         ]
106                 }
107         ]`)
108
109 type PullWorkerTestData struct {
110         name          string
111         req           RequestTester
112         response_code int
113         response_body string
114         read_content  string
115         read_error    bool
116         put_error     bool
117 }
118
119 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
120         defer teardown()
121
122         data_manager_token = "DATA MANAGER TOKEN"
123         testData := PullWorkerTestData{
124                 name:          "TestPullWorker_pull_list_with_two_locators",
125                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
126                 response_code: http.StatusOK,
127                 response_body: "Received 2 pull requests\n",
128                 read_content:  "hello",
129                 read_error:    false,
130                 put_error:     false,
131         }
132
133         performTest(testData, c)
134 }
135
136 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
137         defer teardown()
138
139         data_manager_token = "DATA MANAGER TOKEN"
140         testData := PullWorkerTestData{
141                 name:          "TestPullWorker_pull_list_with_one_locator",
142                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
143                 response_code: http.StatusOK,
144                 response_body: "Received 1 pull requests\n",
145                 read_content:  "hola",
146                 read_error:    false,
147                 put_error:     false,
148         }
149
150         performTest(testData, c)
151 }
152
153 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
154         defer teardown()
155
156         data_manager_token = "DATA MANAGER TOKEN"
157         testData := PullWorkerTestData{
158                 name:          "TestPullWorker_error_on_get_one_locator",
159                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
160                 response_code: http.StatusOK,
161                 response_body: "Received 1 pull requests\n",
162                 read_content:  "unused",
163                 read_error:    true,
164                 put_error:     false,
165         }
166
167         performTest(testData, c)
168 }
169
170 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
171         defer teardown()
172
173         data_manager_token = "DATA MANAGER TOKEN"
174         testData := PullWorkerTestData{
175                 name:          "TestPullWorker_error_on_get_two_locators",
176                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
177                 response_code: http.StatusOK,
178                 response_body: "Received 2 pull requests\n",
179                 read_content:  "unused",
180                 read_error:    true,
181                 put_error:     false,
182         }
183
184         performTest(testData, c)
185 }
186
187 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
188         defer teardown()
189
190         data_manager_token = "DATA MANAGER TOKEN"
191         testData := PullWorkerTestData{
192                 name:          "TestPullWorker_error_on_put_one_locator",
193                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
194                 response_code: http.StatusOK,
195                 response_body: "Received 1 pull requests\n",
196                 read_content:  "unused",
197                 read_error:    false,
198                 put_error:     true,
199         }
200
201         performTest(testData, c)
202 }
203
204 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
205         defer teardown()
206
207         data_manager_token = "DATA MANAGER TOKEN"
208         testData := PullWorkerTestData{
209                 name:          "TestPullWorker_error_on_put_two_locators",
210                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
211                 response_code: http.StatusOK,
212                 response_body: "Received 2 pull requests\n",
213                 read_content:  "unused",
214                 read_error:    false,
215                 put_error:     true,
216         }
217
218         performTest(testData, c)
219 }
220
221 // When a new pull request arrives, the old one is replaced. This test
222 // is used to check that behavior by first putting an item on the queue,
223 // and then performing the test. Thus the "testPullLists" has two entries;
224 // however, processedPullLists will see only the newest item in the list.
225 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_items_latest_replacing_old(c *C) {
226         defer teardown()
227
228         var firstInput = []int{1}
229         pullq = NewWorkQueue()
230         pullq.ReplaceQueue(makeTestWorkList(firstInput))
231         testPullLists["Added_before_actual_test_item"] = string(1)
232
233         data_manager_token = "DATA MANAGER TOKEN"
234         testData := PullWorkerTestData{
235                 name:          "TestPullWorker_pull_list_with_two_items_latest_replacing_old",
236                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
237                 response_code: http.StatusOK,
238                 response_body: "Received 1 pull requests\n",
239                 read_content:  "hola",
240                 read_error:    false,
241                 put_error:     false,
242         }
243
244         performTest(testData, c)
245 }
246
247 func performTest(testData PullWorkerTestData, c *C) {
248         RunTestPullWorker(c)
249
250         currentTestData = testData
251         testPullLists[testData.name] = testData.response_body
252
253         // Override GetContent to mock keepclient Get functionality
254         GetContent = func(signedLocator string, keepClient keepclient.KeepClient) (
255                 reader io.ReadCloser, contentLength int64, url string, err error) {
256
257                 processedPullLists[testData.name] = testData.response_body
258                 if testData.read_error {
259                         err = errors.New("Error getting data")
260                         readError = err
261                         return nil, 0, "", err
262                 } else {
263                         readContent = READ_CONTENT
264                         cb := &ClosingBuffer{bytes.NewBufferString(readContent)}
265                         var rc io.ReadCloser
266                         rc = cb
267                         return rc, int64(len(READ_CONTENT)), "", nil
268                 }
269         }
270
271         // Override PutContent to mock PutBlock functionality
272         PutContent = func(content []byte, locator string) (err error) {
273                 if testData.put_error {
274                         err = errors.New("Error putting data")
275                         putError = err
276                         return err
277                 } else {
278                         putContent = content
279                         return nil
280                 }
281         }
282
283         response := IssueRequest(&testData.req)
284         c.Assert(testData.response_code, Equals, response.Code)
285         c.Assert(testData.response_body, Equals, response.Body.String())
286 }
287
288 type ClosingBuffer struct {
289         *bytes.Buffer
290 }
291
292 func (cb *ClosingBuffer) Close() (err error) {
293         return
294 }
295
296 func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
297         select {
298         case item := <-workerChannel:
299                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
300         default:
301         }
302 }
303
304 func expectWorkerChannelNotEmpty(c *C, workerChannel <-chan interface{}) {
305         select {
306         case item := <-workerChannel:
307                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
308         default:
309         }
310 }