Merge branch 'master' into 3761-pull-list-worker
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         . "gopkg.in/check.v1"
9         "io"
10         "net/http"
11         "testing"
12 )
13
14 type PullWorkerTestSuite struct{}
15
16 // Gocheck boilerplate
17 func TestPullWorker(t *testing.T) {
18         TestingT(t)
19 }
20
21 // Gocheck boilerplate
22 var _ = Suite(&PullWorkerTestSuite{})
23
24 var testPullLists map[string]string
25 var processedPullLists map[string]string
26 var readContent string
27 var readError error
28 var putContent []byte
29 var putError error
30 var currentTestData PullWorkerTestData
31
32 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
33         readContent = ""
34         readError = nil
35         putContent = []byte("")
36         putError = nil
37
38         // When a new pull request arrives, the old one will be overwritten.
39         // This behavior is verified using these two maps in the
40         // "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
41         testPullLists = make(map[string]string)
42         processedPullLists = make(map[string]string)
43 }
44
45 // Since keepstore does not come into picture in tests,
46 // we need to explicitly start the goroutine in tests.
47 func RunTestPullWorker(c *C) {
48         arv, err := arvadosclient.MakeArvadosClient()
49         c.Assert(err, Equals, nil)
50         keepClient, err := keepclient.MakeKeepClient(&arv)
51         c.Assert(err, Equals, nil)
52
53         pullq = NewWorkQueue()
54         go RunPullWorker(pullq, keepClient)
55 }
56
57 var first_pull_list = []byte(`[
58                 {
59                         "locator":"locator1",
60                         "servers":[
61                                 "server_1",
62                                 "server_2"
63                         ]
64                 },
65     {
66                         "locator":"locator2",
67                         "servers":[
68                                 "server_3"
69                         ]
70                 }
71         ]`)
72
73 var second_pull_list = []byte(`[
74                 {
75                         "locator":"locator3",
76                         "servers":[
77                                 "server_1",
78         "server_2"
79                         ]
80                 }
81         ]`)
82
83 type PullWorkerTestData struct {
84         name          string
85         req           RequestTester
86         response_code int
87         response_body string
88         read_content  string
89         read_error    bool
90         put_error     bool
91 }
92
93 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
94         defer teardown()
95
96         data_manager_token = "DATA MANAGER TOKEN"
97         testData := PullWorkerTestData{
98                 name:          "TestPullWorker_pull_list_with_two_locators",
99                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
100                 response_code: http.StatusOK,
101                 response_body: "Received 2 pull requests\n",
102                 read_content:  "hello",
103                 read_error:    false,
104                 put_error:     false,
105         }
106
107         performTest(testData, c)
108 }
109
110 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
111         defer teardown()
112
113         data_manager_token = "DATA MANAGER TOKEN"
114         testData := PullWorkerTestData{
115                 name:          "TestPullWorker_pull_list_with_one_locator",
116                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
117                 response_code: http.StatusOK,
118                 response_body: "Received 1 pull requests\n",
119                 read_content:  "hola",
120                 read_error:    false,
121                 put_error:     false,
122         }
123
124         performTest(testData, c)
125 }
126
127 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
128         defer teardown()
129
130         data_manager_token = "DATA MANAGER TOKEN"
131         testData := PullWorkerTestData{
132                 name:          "TestPullWorker_error_on_get_one_locator",
133                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
134                 response_code: http.StatusOK,
135                 response_body: "Received 1 pull requests\n",
136                 read_content:  "unused",
137                 read_error:    true,
138                 put_error:     false,
139         }
140
141         performTest(testData, c)
142 }
143
144 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
145         defer teardown()
146
147         data_manager_token = "DATA MANAGER TOKEN"
148         testData := PullWorkerTestData{
149                 name:          "TestPullWorker_error_on_get_two_locators",
150                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
151                 response_code: http.StatusOK,
152                 response_body: "Received 2 pull requests\n",
153                 read_content:  "unused",
154                 read_error:    true,
155                 put_error:     false,
156         }
157
158         performTest(testData, c)
159 }
160
161 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
162         defer teardown()
163
164         data_manager_token = "DATA MANAGER TOKEN"
165         testData := PullWorkerTestData{
166                 name:          "TestPullWorker_error_on_put_one_locator",
167                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
168                 response_code: http.StatusOK,
169                 response_body: "Received 1 pull requests\n",
170                 read_content:  "hello hello",
171                 read_error:    false,
172                 put_error:     true,
173         }
174
175         performTest(testData, c)
176 }
177
178 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
179         defer teardown()
180
181         data_manager_token = "DATA MANAGER TOKEN"
182         testData := PullWorkerTestData{
183                 name:          "TestPullWorker_error_on_put_two_locators",
184                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
185                 response_code: http.StatusOK,
186                 response_body: "Received 2 pull requests\n",
187                 read_content:  "hello again",
188                 read_error:    false,
189                 put_error:     true,
190         }
191
192         performTest(testData, c)
193 }
194
195 // When a new pull request arrives, the old one is replaced. This test
196 // is used to check that behavior by first putting an item on the queue,
197 // and then performing the test. Thus the "testPullLists" has two entries;
198 // however, processedPullLists will see only the newest item in the list.
199 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_items_latest_replacing_old(c *C) {
200         defer teardown()
201
202         var firstInput = []int{1}
203         pullq = NewWorkQueue()
204         pullq.ReplaceQueue(makeTestWorkList(firstInput))
205         testPullLists["Added_before_actual_test_item"] = string(1)
206
207         data_manager_token = "DATA MANAGER TOKEN"
208         testData := PullWorkerTestData{
209                 name:          "TestPullWorker_pull_list_with_two_items_latest_replacing_old",
210                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
211                 response_code: http.StatusOK,
212                 response_body: "Received 1 pull requests\n",
213                 read_content:  "hola de nuevo",
214                 read_error:    false,
215                 put_error:     false,
216         }
217
218         performTest(testData, c)
219 }
220
221 func performTest(testData PullWorkerTestData, c *C) {
222         RunTestPullWorker(c)
223
224         currentTestData = testData
225         testPullLists[testData.name] = testData.response_body
226
227         // Override GetContent to mock keepclient Get functionality
228         GetContent = func(signedLocator string, keepClient keepclient.KeepClient) (
229                 reader io.ReadCloser, contentLength int64, url string, err error) {
230
231                 processedPullLists[testData.name] = testData.response_body
232                 if testData.read_error {
233                         err = errors.New("Error getting data")
234                         readError = err
235                         return nil, 0, "", err
236                 } else {
237                         readContent = testData.read_content
238                         cb := &ClosingBuffer{bytes.NewBufferString(testData.read_content)}
239                         var rc io.ReadCloser
240                         rc = cb
241                         return rc, int64(len(testData.read_content)), "", nil
242                 }
243         }
244
245         // Override PutContent to mock PutBlock functionality
246         PutContent = func(content []byte, locator string) (err error) {
247                 if testData.put_error {
248                         err = errors.New("Error putting data")
249                         putError = err
250                         return err
251                 } else {
252                         putContent = content
253                         return nil
254                 }
255         }
256
257         response := IssueRequest(&testData.req)
258         c.Assert(testData.response_code, Equals, response.Code)
259         c.Assert(testData.response_body, Equals, response.Body.String())
260
261         expectWorkerChannelEmpty(c, pullq.NextItem)
262
263         pullq.Close()
264
265         if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
266                 c.Assert(len(testPullLists), Equals, 2)
267                 c.Assert(len(processedPullLists), Equals, 1)
268                 c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
269                 c.Assert(testPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
270                 c.Assert(processedPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
271         } else {
272                 c.Assert(len(testPullLists), Equals, 1)
273                 c.Assert(len(processedPullLists), Equals, 1)
274                 c.Assert(testPullLists[testData.name], NotNil)
275         }
276
277         if testData.read_error {
278                 c.Assert(readError, NotNil)
279         } else {
280                 c.Assert(readError, IsNil)
281                 c.Assert(readContent, Equals, testData.read_content)
282                 if testData.put_error {
283                         c.Assert(putError, NotNil)
284                 } else {
285                         c.Assert(putError, IsNil)
286                         c.Assert(string(putContent), Equals, testData.read_content)
287                 }
288         }
289
290 }
291
292 type ClosingBuffer struct {
293         *bytes.Buffer
294 }
295
296 func (cb *ClosingBuffer) Close() (err error) {
297         return
298 }
299
300 func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
301         select {
302         case item := <-workerChannel:
303                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
304         default:
305         }
306 }
307
308 func expectWorkerChannelNotEmpty(c *C, workerChannel <-chan interface{}) {
309         select {
310         case item := <-workerChannel:
311                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
312         default:
313         }
314 }