Merge branch '5414-keep-service-hints' refs #5414
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         . "gopkg.in/check.v1"
9         "io"
10         "net/http"
11         "testing"
12 )
13
14 type PullWorkerTestSuite struct{}
15
16 // Gocheck boilerplate
17 func TestPullWorker(t *testing.T) {
18         TestingT(t)
19 }
20
21 // Gocheck boilerplate
22 var _ = Suite(&PullWorkerTestSuite{})
23
24 var testPullLists map[string]string
25 var processedPullLists map[string]string
26 var readContent string
27 var readError error
28 var putContent []byte
29 var putError error
30 var currentTestData PullWorkerTestData
31
32 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
33         readContent = ""
34         readError = nil
35         putContent = []byte("")
36         putError = nil
37
38         // When a new pull request arrives, the old one will be overwritten.
39         // This behavior is verified using these two maps in the
40         // "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
41         testPullLists = make(map[string]string)
42         processedPullLists = make(map[string]string)
43 }
44
45 // Since keepstore does not come into picture in tests,
46 // we need to explicitly start the goroutine in tests.
47 func RunTestPullWorker(c *C) {
48         arv, err := arvadosclient.MakeArvadosClient()
49         c.Assert(err, Equals, nil)
50         keepClient, err := keepclient.MakeKeepClient(&arv)
51         c.Assert(err, Equals, nil)
52
53         pullq = NewWorkQueue()
54         go RunPullWorker(pullq, keepClient)
55 }
56
57 var first_pull_list = []byte(`[
58                 {
59                         "locator":"locator1",
60                         "servers":[
61                                 "server_1",
62                                 "server_2"
63                         ]
64                 },
65     {
66                         "locator":"locator2",
67                         "servers":[
68                                 "server_3"
69                         ]
70                 }
71         ]`)
72
73 var second_pull_list = []byte(`[
74                 {
75                         "locator":"locator3",
76                         "servers":[
77                                 "server_1",
78         "server_2"
79                         ]
80                 }
81         ]`)
82
83 type PullWorkerTestData struct {
84         name          string
85         req           RequestTester
86         response_code int
87         response_body string
88         read_content  string
89         read_error    bool
90         put_error     bool
91 }
92
93 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
94         defer teardown()
95
96         data_manager_token = "DATA MANAGER TOKEN"
97         testData := PullWorkerTestData{
98                 name:          "TestPullWorker_pull_list_with_two_locators",
99                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
100                 response_code: http.StatusOK,
101                 response_body: "Received 2 pull requests\n",
102                 read_content:  "hello",
103                 read_error:    false,
104                 put_error:     false,
105         }
106
107         performTest(testData, c)
108 }
109
110 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
111         defer teardown()
112
113         data_manager_token = "DATA MANAGER TOKEN"
114         testData := PullWorkerTestData{
115                 name:          "TestPullWorker_pull_list_with_one_locator",
116                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
117                 response_code: http.StatusOK,
118                 response_body: "Received 1 pull requests\n",
119                 read_content:  "hola",
120                 read_error:    false,
121                 put_error:     false,
122         }
123
124         performTest(testData, c)
125 }
126
127 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
128         defer teardown()
129
130         data_manager_token = "DATA MANAGER TOKEN"
131         testData := PullWorkerTestData{
132                 name:          "TestPullWorker_error_on_get_one_locator",
133                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
134                 response_code: http.StatusOK,
135                 response_body: "Received 1 pull requests\n",
136                 read_content:  "unused",
137                 read_error:    true,
138                 put_error:     false,
139         }
140
141         performTest(testData, c)
142 }
143
144 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
145         defer teardown()
146
147         data_manager_token = "DATA MANAGER TOKEN"
148         testData := PullWorkerTestData{
149                 name:          "TestPullWorker_error_on_get_two_locators",
150                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
151                 response_code: http.StatusOK,
152                 response_body: "Received 2 pull requests\n",
153                 read_content:  "unused",
154                 read_error:    true,
155                 put_error:     false,
156         }
157
158         performTest(testData, c)
159 }
160
161 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
162         defer teardown()
163
164         data_manager_token = "DATA MANAGER TOKEN"
165         testData := PullWorkerTestData{
166                 name:          "TestPullWorker_error_on_put_one_locator",
167                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
168                 response_code: http.StatusOK,
169                 response_body: "Received 1 pull requests\n",
170                 read_content:  "hello hello",
171                 read_error:    false,
172                 put_error:     true,
173         }
174
175         performTest(testData, c)
176 }
177
178 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
179         defer teardown()
180
181         data_manager_token = "DATA MANAGER TOKEN"
182         testData := PullWorkerTestData{
183                 name:          "TestPullWorker_error_on_put_two_locators",
184                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
185                 response_code: http.StatusOK,
186                 response_body: "Received 2 pull requests\n",
187                 read_content:  "hello again",
188                 read_error:    false,
189                 put_error:     true,
190         }
191
192         performTest(testData, c)
193 }
194
195 // When a new pull request arrives, the old one is replaced. This test
196 // is used to check that behavior by first putting an item on the queue,
197 // and then performing the test. Thus the "testPullLists" has two entries;
198 // however, processedPullLists will see only the newest item in the list.
199 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_items_latest_replacing_old(c *C) {
200         defer teardown()
201
202         var firstInput = []int{1}
203         pullq = NewWorkQueue()
204         pullq.ReplaceQueue(makeTestWorkList(firstInput))
205         testPullLists["Added_before_actual_test_item"] = string(1)
206
207         data_manager_token = "DATA MANAGER TOKEN"
208         testData := PullWorkerTestData{
209                 name:          "TestPullWorker_pull_list_with_two_items_latest_replacing_old",
210                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
211                 response_code: http.StatusOK,
212                 response_body: "Received 1 pull requests\n",
213                 read_content:  "hola de nuevo",
214                 read_error:    false,
215                 put_error:     false,
216         }
217
218         performTest(testData, c)
219 }
220
221 // In this case, the item will not be placed on pullq
222 func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) {
223         defer teardown()
224
225         data_manager_token = "DATA MANAGER TOKEN"
226
227         testData := PullWorkerTestData{
228                 name:          "TestPullWorker_pull_list_with_two_locators",
229                 req:           RequestTester{"/pull", "invalid_data_manager_token", "PUT", first_pull_list},
230                 response_code: http.StatusUnauthorized,
231                 response_body: "Unauthorized\n",
232                 read_content:  "hello",
233                 read_error:    false,
234                 put_error:     false,
235         }
236
237         performTest(testData, c)
238 }
239
240 func performTest(testData PullWorkerTestData, c *C) {
241         RunTestPullWorker(c)
242
243         currentTestData = testData
244         testPullLists[testData.name] = testData.response_body
245
246         // Override GetContent to mock keepclient Get functionality
247         GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
248                 reader io.ReadCloser, contentLength int64, url string, err error) {
249
250                 processedPullLists[testData.name] = testData.response_body
251                 if testData.read_error {
252                         err = errors.New("Error getting data")
253                         readError = err
254                         return nil, 0, "", err
255                 } else {
256                         readContent = testData.read_content
257                         cb := &ClosingBuffer{bytes.NewBufferString(testData.read_content)}
258                         var rc io.ReadCloser
259                         rc = cb
260                         return rc, int64(len(testData.read_content)), "", nil
261                 }
262         }
263
264         // Override PutContent to mock PutBlock functionality
265         PutContent = func(content []byte, locator string) (err error) {
266                 if testData.put_error {
267                         err = errors.New("Error putting data")
268                         putError = err
269                         return err
270                 } else {
271                         putContent = content
272                         return nil
273                 }
274         }
275
276         response := IssueRequest(&testData.req)
277         c.Assert(testData.response_code, Equals, response.Code)
278         c.Assert(testData.response_body, Equals, response.Body.String())
279
280         expectWorkerChannelEmpty(c, pullq.NextItem)
281
282         pullq.Close()
283
284         if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
285                 c.Assert(len(testPullLists), Equals, 2)
286                 c.Assert(len(processedPullLists), Equals, 1)
287                 c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
288                 c.Assert(testPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
289                 c.Assert(processedPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
290         } else {
291                 if testData.response_code == http.StatusOK {
292                         c.Assert(len(testPullLists), Equals, 1)
293                         c.Assert(len(processedPullLists), Equals, 1)
294                         c.Assert(testPullLists[testData.name], NotNil)
295                 } else {
296                         c.Assert(len(testPullLists), Equals, 1)
297                         c.Assert(len(processedPullLists), Equals, 0)
298                 }
299         }
300
301         if testData.read_error {
302                 c.Assert(readError, NotNil)
303         } else if testData.response_code == http.StatusOK {
304                 c.Assert(readError, IsNil)
305                 c.Assert(readContent, Equals, testData.read_content)
306                 if testData.put_error {
307                         c.Assert(putError, NotNil)
308                 } else {
309                         c.Assert(putError, IsNil)
310                         c.Assert(string(putContent), Equals, testData.read_content)
311                 }
312         }
313 }
314
315 type ClosingBuffer struct {
316         *bytes.Buffer
317 }
318
319 func (cb *ClosingBuffer) Close() (err error) {
320         return
321 }
322
323 func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
324         select {
325         case item := <-workerChannel:
326                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
327         default:
328         }
329 }
330
331 func expectWorkerChannelNotEmpty(c *C, workerChannel <-chan interface{}) {
332         select {
333         case item := <-workerChannel:
334                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
335         default:
336         }
337 }