3761: pass keepClient to Pull method
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         . "gopkg.in/check.v1"
9         "io"
10         "net/http"
11         "strings"
12         "testing"
13         "time"
14 )
15
16 var testPullLists map[string]string
17 var processedPullLists map[string]string
18
19 type PullWorkerTestSuite struct{}
20
21 // Gocheck boilerplate
22 func Test(t *testing.T) {
23         TestingT(t)
24 }
25
26 // Gocheck boilerplate
27 var _ = Suite(&PullWorkerTestSuite{})
28
29 func (s *PullWorkerTestSuite) SetUpSuite(c *C) {
30         // Since keepstore does not come into picture in tests,
31         // we need to explicitly start the goroutine in tests.
32         arv, err := arvadosclient.MakeArvadosClient()
33         c.Assert(err, Equals, nil)
34         keepClient, err := keepclient.MakeKeepClient(&arv)
35         c.Assert(err, Equals, nil)
36         go RunPullWorker(pullq, keepClient)
37
38         // When a new pull request arrives, the old one will be overwritten.
39         // This behavior is simulated with delay tests below.
40         testPullLists = make(map[string]string)
41         processedPullLists = make(map[string]string)
42 }
43
44 func (s *PullWorkerTestSuite) TearDownSuite(c *C) {
45         // give the channel some time to read and process all pull list entries
46         time.Sleep(1000 * time.Millisecond)
47
48         expectWorkerChannelEmpty(c, pullq.NextItem)
49
50         c.Assert(len(processedPullLists), Not(Equals), len(testPullLists))
51 }
52
53 var first_pull_list = []byte(`[
54                 {
55                         "locator":"locator1",
56                         "servers":[
57                                 "server_1",
58                                 "server_2"
59                         ]
60                 },
61     {
62                         "locator":"locator2",
63                         "servers":[
64                                 "server_3"
65                         ]
66                 }
67         ]`)
68
69 var second_pull_list = []byte(`[
70                 {
71                         "locator":"locator3",
72                         "servers":[
73                                 "server_1",
74         "server_2"
75                         ]
76                 }
77         ]`)
78
79 type PullWorkerTestData struct {
80         name          string
81         req           RequestTester
82         response_code int
83         response_body string
84         read_content  string
85         read_error    bool
86         put_error     bool
87 }
88
89 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
90         defer teardown()
91
92         data_manager_token = "DATA MANAGER TOKEN"
93         testData := PullWorkerTestData{
94                 "TestPullWorker_pull_list_with_two_locators",
95                 RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
96                 http.StatusOK,
97                 "Received 2 pull requests\n",
98                 "hello",
99                 false,
100                 false,
101         }
102
103         performTest(testData, c)
104 }
105
106 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
107         defer teardown()
108
109         data_manager_token = "DATA MANAGER TOKEN"
110         testData := PullWorkerTestData{
111                 "TestPullWorker_pull_list_with_one_locator",
112                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
113                 http.StatusOK,
114                 "Received 1 pull requests\n",
115                 "hola",
116                 false,
117                 false,
118         }
119
120         performTest(testData, c)
121 }
122
123 // When a new pull request arrives, the old one will be overwritten.
124 // Simulate this behavior by inducing delay in GetContent for the delay test(s).
125 // To ensure this delay test is not the last one executed and
126 // hence we cannot verify this behavior, let's run the delay test twice.
127 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator_with_delay_1(c *C) {
128         defer teardown()
129
130         data_manager_token = "DATA MANAGER TOKEN"
131         testData := PullWorkerTestData{
132                 "TestPullWorker_pull_list_with_one_locator_with_delay_1",
133                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
134                 http.StatusOK,
135                 "Received 1 pull requests\n",
136                 "hola",
137                 false,
138                 false,
139         }
140
141         performTest(testData, c)
142 }
143
144 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator_with_delay_2(c *C) {
145         defer teardown()
146
147         data_manager_token = "DATA MANAGER TOKEN"
148         testData := PullWorkerTestData{
149                 "TestPullWorker_pull_list_with_one_locator_with_delay_2",
150                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
151                 http.StatusOK,
152                 "Received 1 pull requests\n",
153                 "hola",
154                 false,
155                 false,
156         }
157
158         performTest(testData, c)
159 }
160
161 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
162         defer teardown()
163
164         data_manager_token = "DATA MANAGER TOKEN"
165         testData := PullWorkerTestData{
166                 "TestPullWorker_error_on_get_one_locator",
167                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
168                 http.StatusOK,
169                 "Received 1 pull requests\n",
170                 "unused",
171                 true,
172                 false,
173         }
174
175         performTest(testData, c)
176 }
177
178 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
179         defer teardown()
180
181         data_manager_token = "DATA MANAGER TOKEN"
182         testData := PullWorkerTestData{
183                 "TestPullWorker_error_on_get_two_locators",
184                 RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
185                 http.StatusOK,
186                 "Received 2 pull requests\n",
187                 "unused",
188                 true,
189                 false,
190         }
191
192         performTest(testData, c)
193 }
194
195 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
196         defer teardown()
197
198         data_manager_token = "DATA MANAGER TOKEN"
199         testData := PullWorkerTestData{
200                 "TestPullWorker_error_on_put_one_locator",
201                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
202                 http.StatusOK,
203                 "Received 1 pull requests\n",
204                 "unused",
205                 false,
206                 true,
207         }
208
209         performTest(testData, c)
210 }
211
212 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
213         defer teardown()
214
215         data_manager_token = "DATA MANAGER TOKEN"
216         testData := PullWorkerTestData{
217                 "TestPullWorker_error_on_put_two_locators",
218                 RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
219                 http.StatusOK,
220                 "Received 2 pull requests\n",
221                 "unused",
222                 false,
223                 true,
224         }
225
226         performTest(testData, c)
227 }
228
229 func performTest(testData PullWorkerTestData, c *C) {
230         testPullLists[testData.name] = testData.response_body
231
232         // We need to make sure the tests have a slight delay so that we can verify the pull list channel overwrites.
233         time.Sleep(25 * time.Millisecond)
234
235         // Override GetContent to mock keepclient Get functionality
236         GetContent = func(signedLocator string, keepClient keepclient.KeepClient) (
237                 reader io.ReadCloser, contentLength int64, url string, err error) {
238                 if strings.HasPrefix(testData.name, "TestPullWorker_pull_list_with_one_locator_with_delay") {
239                         time.Sleep(100 * time.Millisecond)
240                 }
241
242                 processedPullLists[testData.name] = testData.response_body
243                 if testData.read_error {
244                         return nil, 0, "", errors.New("Error getting data")
245                 } else {
246                         cb := &ClosingBuffer{bytes.NewBufferString("Hi!")}
247                         var rc io.ReadCloser
248                         rc = cb
249                         return rc, 3, "", nil
250                 }
251         }
252
253         // Override PutContent to mock PutBlock functionality
254         PutContent = func(content []byte, locator string) (err error) {
255                 if testData.put_error {
256                         return errors.New("Error putting data")
257                 } else {
258                         return nil
259                 }
260         }
261
262         response := IssueRequest(&testData.req)
263         c.Assert(testData.response_code, Equals, response.Code)
264         c.Assert(testData.response_body, Equals, response.Body.String())
265 }
266
267 type ClosingBuffer struct {
268         *bytes.Buffer
269 }
270
271 func (cb *ClosingBuffer) Close() (err error) {
272         return
273 }
274
275 func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
276         select {
277         case item := <-workerChannel:
278                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
279         default:
280         }
281 }