3761: move test assertions from teardown method into performTest function for better...
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         . "gopkg.in/check.v1"
9         "io"
10         "net/http"
11         "testing"
12 )
13
14 type PullWorkerTestSuite struct{}
15
16 // Gocheck boilerplate
17 func TestPullWorker(t *testing.T) {
18         TestingT(t)
19 }
20
21 // Gocheck boilerplate
22 var _ = Suite(&PullWorkerTestSuite{})
23
24 var testPullLists map[string]string
25 var processedPullLists map[string]string
26 var readContent string
27 var readError error
28 var putContent []byte
29 var putError error
30 var currentTestData PullWorkerTestData
31
32 const READ_CONTENT = "Hi!"
33
34 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
35         readContent = ""
36         readError = nil
37         putContent = []byte("")
38         putError = nil
39
40         // When a new pull request arrives, the old one will be overwritten.
41         // This behavior is verified using these maps in the "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
42         testPullLists = make(map[string]string)
43         processedPullLists = make(map[string]string)
44 }
45
46 // Since keepstore does not come into picture in tests,
47 // we need to explicitly start the goroutine in tests.
48 func RunTestPullWorker(c *C) {
49         arv, err := arvadosclient.MakeArvadosClient()
50         c.Assert(err, Equals, nil)
51         keepClient, err := keepclient.MakeKeepClient(&arv)
52         c.Assert(err, Equals, nil)
53
54         pullq = NewWorkQueue()
55         go RunPullWorker(pullq, keepClient)
56 }
57
58 var first_pull_list = []byte(`[
59                 {
60                         "locator":"locator1",
61                         "servers":[
62                                 "server_1",
63                                 "server_2"
64                         ]
65                 },
66     {
67                         "locator":"locator2",
68                         "servers":[
69                                 "server_3"
70                         ]
71                 }
72         ]`)
73
74 var second_pull_list = []byte(`[
75                 {
76                         "locator":"locator3",
77                         "servers":[
78                                 "server_1",
79         "server_2"
80                         ]
81                 }
82         ]`)
83
84 type PullWorkerTestData struct {
85         name          string
86         req           RequestTester
87         response_code int
88         response_body string
89         read_content  string
90         read_error    bool
91         put_error     bool
92 }
93
94 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
95         defer teardown()
96
97         data_manager_token = "DATA MANAGER TOKEN"
98         testData := PullWorkerTestData{
99                 name:          "TestPullWorker_pull_list_with_two_locators",
100                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
101                 response_code: http.StatusOK,
102                 response_body: "Received 2 pull requests\n",
103                 read_content:  "hello",
104                 read_error:    false,
105                 put_error:     false,
106         }
107
108         performTest(testData, c)
109 }
110
111 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
112         defer teardown()
113
114         data_manager_token = "DATA MANAGER TOKEN"
115         testData := PullWorkerTestData{
116                 name:          "TestPullWorker_pull_list_with_one_locator",
117                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
118                 response_code: http.StatusOK,
119                 response_body: "Received 1 pull requests\n",
120                 read_content:  "hola",
121                 read_error:    false,
122                 put_error:     false,
123         }
124
125         performTest(testData, c)
126 }
127
128 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
129         defer teardown()
130
131         data_manager_token = "DATA MANAGER TOKEN"
132         testData := PullWorkerTestData{
133                 name:          "TestPullWorker_error_on_get_one_locator",
134                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
135                 response_code: http.StatusOK,
136                 response_body: "Received 1 pull requests\n",
137                 read_content:  "unused",
138                 read_error:    true,
139                 put_error:     false,
140         }
141
142         performTest(testData, c)
143 }
144
145 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
146         defer teardown()
147
148         data_manager_token = "DATA MANAGER TOKEN"
149         testData := PullWorkerTestData{
150                 name:          "TestPullWorker_error_on_get_two_locators",
151                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
152                 response_code: http.StatusOK,
153                 response_body: "Received 2 pull requests\n",
154                 read_content:  "unused",
155                 read_error:    true,
156                 put_error:     false,
157         }
158
159         performTest(testData, c)
160 }
161
162 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
163         defer teardown()
164
165         data_manager_token = "DATA MANAGER TOKEN"
166         testData := PullWorkerTestData{
167                 name:          "TestPullWorker_error_on_put_one_locator",
168                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
169                 response_code: http.StatusOK,
170                 response_body: "Received 1 pull requests\n",
171                 read_content:  "unused",
172                 read_error:    false,
173                 put_error:     true,
174         }
175
176         performTest(testData, c)
177 }
178
179 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
180         defer teardown()
181
182         data_manager_token = "DATA MANAGER TOKEN"
183         testData := PullWorkerTestData{
184                 name:          "TestPullWorker_error_on_put_two_locators",
185                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
186                 response_code: http.StatusOK,
187                 response_body: "Received 2 pull requests\n",
188                 read_content:  "unused",
189                 read_error:    false,
190                 put_error:     true,
191         }
192
193         performTest(testData, c)
194 }
195
196 // When a new pull request arrives, the old one is replaced. This test
197 // is used to check that behavior by first putting an item on the queue,
198 // and then performing the test. Thus the "testPullLists" has two entries;
199 // however, processedPullLists will see only the newest item in the list.
200 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_items_latest_replacing_old(c *C) {
201         defer teardown()
202
203         var firstInput = []int{1}
204         pullq = NewWorkQueue()
205         pullq.ReplaceQueue(makeTestWorkList(firstInput))
206         testPullLists["Added_before_actual_test_item"] = string(1)
207
208         data_manager_token = "DATA MANAGER TOKEN"
209         testData := PullWorkerTestData{
210                 name:          "TestPullWorker_pull_list_with_two_items_latest_replacing_old",
211                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
212                 response_code: http.StatusOK,
213                 response_body: "Received 1 pull requests\n",
214                 read_content:  "hola",
215                 read_error:    false,
216                 put_error:     false,
217         }
218
219         performTest(testData, c)
220 }
221
222 func performTest(testData PullWorkerTestData, c *C) {
223         RunTestPullWorker(c)
224
225         currentTestData = testData
226         testPullLists[testData.name] = testData.response_body
227
228         // Override GetContent to mock keepclient Get functionality
229         GetContent = func(signedLocator string, keepClient keepclient.KeepClient) (
230                 reader io.ReadCloser, contentLength int64, url string, err error) {
231
232                 processedPullLists[testData.name] = testData.response_body
233                 if testData.read_error {
234                         err = errors.New("Error getting data")
235                         readError = err
236                         return nil, 0, "", err
237                 } else {
238                         readContent = READ_CONTENT
239                         cb := &ClosingBuffer{bytes.NewBufferString(readContent)}
240                         var rc io.ReadCloser
241                         rc = cb
242                         return rc, int64(len(READ_CONTENT)), "", nil
243                 }
244         }
245
246         // Override PutContent to mock PutBlock functionality
247         PutContent = func(content []byte, locator string) (err error) {
248                 if testData.put_error {
249                         err = errors.New("Error putting data")
250                         putError = err
251                         return err
252                 } else {
253                         putContent = content
254                         return nil
255                 }
256         }
257
258         response := IssueRequest(&testData.req)
259         c.Assert(testData.response_code, Equals, response.Code)
260         c.Assert(testData.response_body, Equals, response.Body.String())
261
262         expectWorkerChannelEmpty(c, pullq.NextItem)
263
264         pullq.Close()
265
266         if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
267                 c.Assert(len(testPullLists), Equals, 2)
268                 c.Assert(len(processedPullLists), Equals, 1)
269                 c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
270                 c.Assert(testPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
271                 c.Assert(processedPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
272         } else {
273                 c.Assert(len(testPullLists), Equals, 1)
274                 c.Assert(len(processedPullLists), Equals, 1)
275                 c.Assert(testPullLists[testData.name], NotNil)
276         }
277
278         if testData.read_error {
279                 c.Assert(readError, NotNil)
280         } else {
281                 c.Assert(readError, IsNil)
282                 c.Assert(readContent, Equals, READ_CONTENT)
283                 if testData.put_error {
284                         c.Assert(putError, NotNil)
285                 } else {
286                         c.Assert(putError, IsNil)
287                         c.Assert(string(putContent), Equals, READ_CONTENT)
288                 }
289         }
290
291 }
292
293 type ClosingBuffer struct {
294         *bytes.Buffer
295 }
296
297 func (cb *ClosingBuffer) Close() (err error) {
298         return
299 }
300
301 func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
302         select {
303         case item := <-workerChannel:
304                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
305         default:
306         }
307 }
308
309 func expectWorkerChannelNotEmpty(c *C, workerChannel <-chan interface{}) {
310         select {
311         case item := <-workerChannel:
312                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
313         default:
314         }
315 }