3761: verify ReadContent and PutContent outputs.
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         . "gopkg.in/check.v1"
9         "io"
10         "net/http"
11         "strings"
12         "testing"
13         "time"
14 )
15
16 type PullWorkerTestSuite struct{}
17
18 // Gocheck boilerplate
19 func TestPullWorker(t *testing.T) {
20         TestingT(t)
21 }
22
23 // Gocheck boilerplate
24 var _ = Suite(&PullWorkerTestSuite{})
25
26 var testPullLists map[string]string
27 var processedPullLists map[string]string
28 var readContent string
29 var readError error
30 var putContent []byte
31 var putError error
32 var currentTestData PullWorkerTestData
33
34 const READ_CONTENT = "Hi!"
35
36 func RunTestPullWorker(c *C) {
37         // Since keepstore does not come into picture in tests,
38         // we need to explicitly start the goroutine in tests.
39         arv, err := arvadosclient.MakeArvadosClient()
40         c.Assert(err, Equals, nil)
41         keepClient, err := keepclient.MakeKeepClient(&arv)
42         c.Assert(err, Equals, nil)
43
44         pullq = NewWorkQueue()
45         go RunPullWorker(pullq, keepClient)
46 }
47
48 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
49         readContent = ""
50         readError = nil
51         putContent = []byte("")
52         putError = nil
53
54         // When a new pull request arrives, the old one will be overwritten.
55         // This behavior is simulated with delay tests below.
56         testPullLists = make(map[string]string)
57         processedPullLists = make(map[string]string)
58 }
59
60 func (s *PullWorkerTestSuite) TearDownTest(c *C) {
61         time.Sleep(20 * time.Millisecond)
62         expectWorkerChannelEmpty(c, pullq.NextItem)
63
64         // give the channel some time to read and process all pull list entries
65         //      time.Sleep(1000 * time.Millisecond)
66         //      expectWorkerChannelEmpty(c, pullq.NextItem)
67         //      c.Assert(len(processedPullLists), Not(Equals), len(testPullLists))
68
69         if currentTestData.read_error {
70                 c.Assert(readError, NotNil)
71         } else {
72                 c.Assert(readError, IsNil)
73                 c.Assert(readContent, Equals, READ_CONTENT)
74                 if currentTestData.put_error {
75                         c.Assert(putError, NotNil)
76                 } else {
77                         c.Assert(putError, IsNil)
78                         c.Assert(string(putContent), Equals, READ_CONTENT)
79                 }
80         }
81 }
82
83 var first_pull_list = []byte(`[
84                 {
85                         "locator":"locator1",
86                         "servers":[
87                                 "server_1",
88                                 "server_2"
89                         ]
90                 },
91     {
92                         "locator":"locator2",
93                         "servers":[
94                                 "server_3"
95                         ]
96                 }
97         ]`)
98
99 var second_pull_list = []byte(`[
100                 {
101                         "locator":"locator3",
102                         "servers":[
103                                 "server_1",
104         "server_2"
105                         ]
106                 }
107         ]`)
108
109 type PullWorkerTestData struct {
110         name          string
111         req           RequestTester
112         response_code int
113         response_body string
114         read_content  string
115         read_error    bool
116         put_error     bool
117 }
118
119 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
120         defer teardown()
121
122         data_manager_token = "DATA MANAGER TOKEN"
123         testData := PullWorkerTestData{
124                 name:          "TestPullWorker_pull_list_with_two_locators",
125                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
126                 response_code: http.StatusOK,
127                 response_body: "Received 2 pull requests\n",
128                 read_content:  "hello",
129                 read_error:    false,
130                 put_error:     false,
131         }
132
133         performTest(testData, c)
134 }
135
136 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
137         defer teardown()
138
139         data_manager_token = "DATA MANAGER TOKEN"
140         testData := PullWorkerTestData{
141                 name:          "TestPullWorker_pull_list_with_one_locator",
142                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
143                 response_code: http.StatusOK,
144                 response_body: "Received 1 pull requests\n",
145                 read_content:  "hola",
146                 read_error:    false,
147                 put_error:     false,
148         }
149
150         performTest(testData, c)
151 }
152
153 // When a new pull request arrives, the old one will be overwritten.
154 // Simulate this behavior by inducing delay in GetContent for the delay test(s).
155 // To ensure this delay test is not the last one executed and
156 // hence we cannot verify this behavior, let's run the delay test twice.
157 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator_with_delay_1(c *C) {
158         defer teardown()
159
160         data_manager_token = "DATA MANAGER TOKEN"
161         testData := PullWorkerTestData{
162                 name:          "TestPullWorker_pull_list_with_one_locator_with_delay_1",
163                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
164                 response_code: http.StatusOK,
165                 response_body: "Received 1 pull requests\n",
166                 read_content:  "hola",
167                 read_error:    false,
168                 put_error:     false,
169         }
170
171         performTest(testData, c)
172 }
173
174 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator_with_delay_2(c *C) {
175         defer teardown()
176
177         data_manager_token = "DATA MANAGER TOKEN"
178         testData := PullWorkerTestData{
179                 name:          "TestPullWorker_pull_list_with_one_locator_with_delay_2",
180                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
181                 response_code: http.StatusOK,
182                 response_body: "Received 1 pull requests\n",
183                 read_content:  "hola",
184                 read_error:    false,
185                 put_error:     false,
186         }
187
188         performTest(testData, c)
189 }
190
191 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
192         defer teardown()
193
194         data_manager_token = "DATA MANAGER TOKEN"
195         testData := PullWorkerTestData{
196                 name:          "TestPullWorker_error_on_get_one_locator",
197                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
198                 response_code: http.StatusOK,
199                 response_body: "Received 1 pull requests\n",
200                 read_content:  "unused",
201                 read_error:    true,
202                 put_error:     false,
203         }
204
205         performTest(testData, c)
206 }
207
208 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
209         defer teardown()
210
211         data_manager_token = "DATA MANAGER TOKEN"
212         testData := PullWorkerTestData{
213                 name:          "TestPullWorker_error_on_get_two_locators",
214                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
215                 response_code: http.StatusOK,
216                 response_body: "Received 2 pull requests\n",
217                 read_content:  "unused",
218                 read_error:    true,
219                 put_error:     false,
220         }
221
222         performTest(testData, c)
223 }
224
225 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
226         defer teardown()
227
228         data_manager_token = "DATA MANAGER TOKEN"
229         testData := PullWorkerTestData{
230                 name:          "TestPullWorker_error_on_put_one_locator",
231                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
232                 response_code: http.StatusOK,
233                 response_body: "Received 1 pull requests\n",
234                 read_content:  "unused",
235                 read_error:    false,
236                 put_error:     true,
237         }
238
239         performTest(testData, c)
240 }
241
242 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
243         defer teardown()
244
245         data_manager_token = "DATA MANAGER TOKEN"
246         testData := PullWorkerTestData{
247                 name:          "TestPullWorker_error_on_put_two_locators",
248                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
249                 response_code: http.StatusOK,
250                 response_body: "Received 2 pull requests\n",
251                 read_content:  "unused",
252                 read_error:    false,
253                 put_error:     true,
254         }
255
256         performTest(testData, c)
257 }
258
259 func performTest(testData PullWorkerTestData, c *C) {
260         RunTestPullWorker(c)
261
262         currentTestData = testData
263         testPullLists[testData.name] = testData.response_body
264
265         // We need to make sure the tests have a slight delay so that we can verify the pull list channel overwrites.
266         //      time.Sleep(25 * time.Millisecond)
267
268         // Override GetContent to mock keepclient Get functionality
269         GetContent = func(signedLocator string, keepClient keepclient.KeepClient) (
270                 reader io.ReadCloser, contentLength int64, url string, err error) {
271                 if strings.HasPrefix(testData.name, "TestPullWorker_pull_list_with_one_locator_with_delay_1") {
272                         //                      time.Sleep(100 * time.Millisecond)
273                 }
274
275                 processedPullLists[testData.name] = testData.response_body
276                 if testData.read_error {
277                         err = errors.New("Error getting data")
278                         readError = err
279                         return nil, 0, "", err
280                 } else {
281                         readContent = READ_CONTENT
282                         cb := &ClosingBuffer{bytes.NewBufferString(readContent)}
283                         var rc io.ReadCloser
284                         rc = cb
285                         return rc, 3, "", nil
286                 }
287         }
288
289         // Override PutContent to mock PutBlock functionality
290         PutContent = func(content []byte, locator string) (err error) {
291                 if testData.put_error {
292                         err = errors.New("Error putting data")
293                         putError = err
294                         return err
295                 } else {
296                         putContent = content
297                         return nil
298                 }
299         }
300
301         response := IssueRequest(&testData.req)
302         c.Assert(testData.response_code, Equals, response.Code)
303         c.Assert(testData.response_body, Equals, response.Body.String())
304 }
305
306 type ClosingBuffer struct {
307         *bytes.Buffer
308 }
309
310 func (cb *ClosingBuffer) Close() (err error) {
311         return
312 }
313
314 func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
315         select {
316         case item := <-workerChannel:
317                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
318         default:
319         }
320 }