Merge branch 'master' into 6277-manifest-validation-api
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         . "gopkg.in/check.v1"
9         "io"
10         "net/http"
11         "testing"
12 )
13
14 type PullWorkerTestSuite struct{}
15
16 // Gocheck boilerplate
17 func TestPullWorker(t *testing.T) {
18         TestingT(t)
19 }
20
21 // Gocheck boilerplate
22 var _ = Suite(&PullWorkerTestSuite{})
23
24 var testPullLists map[string]string
25 var processedPullLists map[string]string
26 var readContent string
27 var readError error
28 var putContent []byte
29 var putError error
30 var currentTestData PullWorkerTestData
31
32 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
33         readContent = ""
34         readError = nil
35         putContent = []byte("")
36         putError = nil
37
38         // When a new pull request arrives, the old one will be overwritten.
39         // This behavior is verified using these two maps in the
40         // "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
41         testPullLists = make(map[string]string)
42         processedPullLists = make(map[string]string)
43 }
44
45 // Since keepstore does not come into picture in tests,
46 // we need to explicitly start the goroutine in tests.
47 func RunTestPullWorker(c *C) {
48         arv, err := arvadosclient.MakeArvadosClient()
49         c.Assert(err, Equals, nil)
50         keepClient, err := keepclient.MakeKeepClient(&arv)
51         c.Assert(err, Equals, nil)
52
53         pullq = NewWorkQueue()
54         go RunPullWorker(pullq, keepClient)
55 }
56
57 var first_pull_list = []byte(`[
58                 {
59                         "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
60                         "servers":[
61                                 "server_1",
62                                 "server_2"
63                         ]
64                 },{
65                         "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
66                         "servers":[
67                                 "server_3"
68                         ]
69                 }
70         ]`)
71
72 var second_pull_list = []byte(`[
73                 {
74                         "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
75                         "servers":[
76                                 "server_1",
77                                 "server_2"
78                         ]
79                 }
80         ]`)
81
82 type PullWorkerTestData struct {
83         name          string
84         req           RequestTester
85         response_code int
86         response_body string
87         read_content  string
88         read_error    bool
89         put_error     bool
90 }
91
92 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
93         defer teardown()
94
95         data_manager_token = "DATA MANAGER TOKEN"
96         testData := PullWorkerTestData{
97                 name:          "TestPullWorker_pull_list_with_two_locators",
98                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
99                 response_code: http.StatusOK,
100                 response_body: "Received 2 pull requests\n",
101                 read_content:  "hello",
102                 read_error:    false,
103                 put_error:     false,
104         }
105
106         performTest(testData, c)
107 }
108
109 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
110         defer teardown()
111
112         data_manager_token = "DATA MANAGER TOKEN"
113         testData := PullWorkerTestData{
114                 name:          "TestPullWorker_pull_list_with_one_locator",
115                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
116                 response_code: http.StatusOK,
117                 response_body: "Received 1 pull requests\n",
118                 read_content:  "hola",
119                 read_error:    false,
120                 put_error:     false,
121         }
122
123         performTest(testData, c)
124 }
125
126 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
127         defer teardown()
128
129         data_manager_token = "DATA MANAGER TOKEN"
130         testData := PullWorkerTestData{
131                 name:          "TestPullWorker_error_on_get_one_locator",
132                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
133                 response_code: http.StatusOK,
134                 response_body: "Received 1 pull requests\n",
135                 read_content:  "unused",
136                 read_error:    true,
137                 put_error:     false,
138         }
139
140         performTest(testData, c)
141 }
142
143 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
144         defer teardown()
145
146         data_manager_token = "DATA MANAGER TOKEN"
147         testData := PullWorkerTestData{
148                 name:          "TestPullWorker_error_on_get_two_locators",
149                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
150                 response_code: http.StatusOK,
151                 response_body: "Received 2 pull requests\n",
152                 read_content:  "unused",
153                 read_error:    true,
154                 put_error:     false,
155         }
156
157         performTest(testData, c)
158 }
159
160 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
161         defer teardown()
162
163         data_manager_token = "DATA MANAGER TOKEN"
164         testData := PullWorkerTestData{
165                 name:          "TestPullWorker_error_on_put_one_locator",
166                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
167                 response_code: http.StatusOK,
168                 response_body: "Received 1 pull requests\n",
169                 read_content:  "hello hello",
170                 read_error:    false,
171                 put_error:     true,
172         }
173
174         performTest(testData, c)
175 }
176
177 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
178         defer teardown()
179
180         data_manager_token = "DATA MANAGER TOKEN"
181         testData := PullWorkerTestData{
182                 name:          "TestPullWorker_error_on_put_two_locators",
183                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
184                 response_code: http.StatusOK,
185                 response_body: "Received 2 pull requests\n",
186                 read_content:  "hello again",
187                 read_error:    false,
188                 put_error:     true,
189         }
190
191         performTest(testData, c)
192 }
193
194 // When a new pull request arrives, the old one is replaced. This test
195 // is used to check that behavior by first putting an item on the queue,
196 // and then performing the test. Thus the "testPullLists" has two entries;
197 // however, processedPullLists will see only the newest item in the list.
198 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_items_latest_replacing_old(c *C) {
199         defer teardown()
200
201         var firstInput = []int{1}
202         pullq = NewWorkQueue()
203         pullq.ReplaceQueue(makeTestWorkList(firstInput))
204         testPullLists["Added_before_actual_test_item"] = string(1)
205
206         data_manager_token = "DATA MANAGER TOKEN"
207         testData := PullWorkerTestData{
208                 name:          "TestPullWorker_pull_list_with_two_items_latest_replacing_old",
209                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
210                 response_code: http.StatusOK,
211                 response_body: "Received 1 pull requests\n",
212                 read_content:  "hola de nuevo",
213                 read_error:    false,
214                 put_error:     false,
215         }
216
217         performTest(testData, c)
218 }
219
220 // In this case, the item will not be placed on pullq
221 func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) {
222         defer teardown()
223
224         data_manager_token = "DATA MANAGER TOKEN"
225
226         testData := PullWorkerTestData{
227                 name:          "TestPullWorker_pull_list_with_two_locators",
228                 req:           RequestTester{"/pull", "invalid_data_manager_token", "PUT", first_pull_list},
229                 response_code: http.StatusUnauthorized,
230                 response_body: "Unauthorized\n",
231                 read_content:  "hello",
232                 read_error:    false,
233                 put_error:     false,
234         }
235
236         performTest(testData, c)
237 }
238
239 func performTest(testData PullWorkerTestData, c *C) {
240         RunTestPullWorker(c)
241
242         currentTestData = testData
243         testPullLists[testData.name] = testData.response_body
244
245         // Override GetContent to mock keepclient Get functionality
246         defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) { GetContent = orig }(GetContent)
247         GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
248                 reader io.ReadCloser, contentLength int64, url string, err error) {
249
250                 processedPullLists[testData.name] = testData.response_body
251                 if testData.read_error {
252                         err = errors.New("Error getting data")
253                         readError = err
254                         return nil, 0, "", err
255                 } else {
256                         readContent = testData.read_content
257                         cb := &ClosingBuffer{bytes.NewBufferString(testData.read_content)}
258                         var rc io.ReadCloser
259                         rc = cb
260                         return rc, int64(len(testData.read_content)), "", nil
261                 }
262         }
263
264         // Override PutContent to mock PutBlock functionality
265         defer func(orig func([]byte, string)(error)) { PutContent = orig }(PutContent)
266         PutContent = func(content []byte, locator string) (err error) {
267                 if testData.put_error {
268                         err = errors.New("Error putting data")
269                         putError = err
270                         return err
271                 } else {
272                         putContent = content
273                         return nil
274                 }
275         }
276
277         response := IssueRequest(&testData.req)
278         c.Assert(response.Code, Equals, testData.response_code)
279         c.Assert(response.Body.String(), Equals, testData.response_body)
280
281         expectWorkerChannelEmpty(c, pullq.NextItem)
282
283         pullq.Close()
284
285         if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
286                 c.Assert(len(testPullLists), Equals, 2)
287                 c.Assert(len(processedPullLists), Equals, 1)
288                 c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
289                 c.Assert(testPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
290                 c.Assert(processedPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
291         } else {
292                 if testData.response_code == http.StatusOK {
293                         c.Assert(len(testPullLists), Equals, 1)
294                         c.Assert(len(processedPullLists), Equals, 1)
295                         c.Assert(testPullLists[testData.name], NotNil)
296                 } else {
297                         c.Assert(len(testPullLists), Equals, 1)
298                         c.Assert(len(processedPullLists), Equals, 0)
299                 }
300         }
301
302         if testData.read_error {
303                 c.Assert(readError, NotNil)
304         } else if testData.response_code == http.StatusOK {
305                 c.Assert(readError, IsNil)
306                 c.Assert(readContent, Equals, testData.read_content)
307                 if testData.put_error {
308                         c.Assert(putError, NotNil)
309                 } else {
310                         c.Assert(putError, IsNil)
311                         c.Assert(string(putContent), Equals, testData.read_content)
312                 }
313         }
314 }
315
316 type ClosingBuffer struct {
317         *bytes.Buffer
318 }
319
320 func (cb *ClosingBuffer) Close() (err error) {
321         return
322 }
323
324 func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
325         select {
326         case item := <-workerChannel:
327                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
328         default:
329         }
330 }
331
332 func expectWorkerChannelNotEmpty(c *C, workerChannel <-chan interface{}) {
333         select {
334         case item := <-workerChannel:
335                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
336         default:
337         }
338 }