Merge branch 'master' into 3761-pull-list-worker
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         . "gopkg.in/check.v1"
9         "io"
10         "net/http"
11         "strings"
12         "testing"
13         "time"
14 )
15
16 var testPullLists map[string]string
17 var processedPullLists map[string]string
18
19 type PullWorkerTestSuite struct{}
20
21 // Gocheck boilerplate
22 func Test(t *testing.T) {
23         TestingT(t)
24 }
25
26 // Gocheck boilerplate
27 var _ = Suite(&PullWorkerTestSuite{})
28
29 func (s *PullWorkerTestSuite) SetUpSuite(c *C) {
30         // Since keepstore does not come into picture in tests,
31         // we need to explicitly start the goroutine in tests.
32         arv, err := arvadosclient.MakeArvadosClient()
33         c.Assert(err, Equals, nil)
34         keepClient, err := keepclient.MakeKeepClient(&arv)
35         c.Assert(err, Equals, nil)
36         go RunPullWorker(pullq.NextItem, keepClient)
37
38         // When a new pull request arrives, the old one will be overwritten.
39         // This behavior is simulated with delay tests below.
40         testPullLists = make(map[string]string)
41         processedPullLists = make(map[string]string)
42 }
43
44 func (s *PullWorkerTestSuite) TearDownSuite(c *C) {
45         // give the channel enough time to read and process all pull list entries
46         time.Sleep(1000 * time.Millisecond)
47
48         expectWorkerChannelEmpty(c, pullq.NextItem)
49
50         c.Assert(len(processedPullLists), Not(Equals), len(testPullLists))
51 }
52
53 var first_pull_list = []byte(`[
54                 {
55                         "locator":"locator1",
56                         "servers":[
57                                 "server_1",
58                                 "server_2"
59                         ]
60                 },
61     {
62                         "locator":"locator2",
63                         "servers":[
64                                 "server_3"
65                         ]
66                 }
67         ]`)
68
69 var second_pull_list = []byte(`[
70                 {
71                         "locator":"locator3",
72                         "servers":[
73                                 "server_1",
74         "server_2"
75                         ]
76                 }
77         ]`)
78
79 type PullWorkerTestData struct {
80         name          string
81         req           RequestTester
82         response_code int
83         response_body string
84         read_content  string
85         read_error    bool
86         put_error     bool
87 }
88
89 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
90         defer teardown()
91
92         data_manager_token = "DATA MANAGER TOKEN"
93         testData := PullWorkerTestData{
94                 "TestPullWorker_pull_list_with_two_locators",
95                 RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
96                 http.StatusOK,
97                 "Received 2 pull requests\n",
98                 "hello",
99                 false,
100                 false,
101         }
102
103         performTest(testData, c)
104 }
105
106 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
107         defer teardown()
108
109         data_manager_token = "DATA MANAGER TOKEN"
110         testData := PullWorkerTestData{
111                 "TestPullWorker_pull_list_with_one_locator",
112                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
113                 http.StatusOK,
114                 "Received 1 pull requests\n",
115                 "hola",
116                 false,
117                 false,
118         }
119
120         performTest(testData, c)
121 }
122
123 // When a new pull request arrives, the old one will be overwritten.
124 // Simulate this behavior by inducing delay in GetContent for the delay test(s).
125 // To ensure this delay test is not the last one executed and
126 // hence we cannot verify this behavior, let's run the delay test twice.
127 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator_with_delay_1(c *C) {
128         defer teardown()
129
130         data_manager_token = "DATA MANAGER TOKEN"
131         testData := PullWorkerTestData{
132                 "TestPullWorker_pull_list_with_one_locator_with_delay_1",
133                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
134                 http.StatusOK,
135                 "Received 1 pull requests\n",
136                 "hola",
137                 false,
138                 false,
139         }
140
141         performTest(testData, c)
142 }
143
144 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator_with_delay_2(c *C) {
145         defer teardown()
146
147         data_manager_token = "DATA MANAGER TOKEN"
148         testData := PullWorkerTestData{
149                 "TestPullWorker_pull_list_with_one_locator_with_delay_2",
150                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
151                 http.StatusOK,
152                 "Received 1 pull requests\n",
153                 "hola",
154                 false,
155                 false,
156         }
157
158         performTest(testData, c)
159 }
160
161 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
162         defer teardown()
163
164         data_manager_token = "DATA MANAGER TOKEN"
165         testData := PullWorkerTestData{
166                 "TestPullWorker_error_on_get_one_locator",
167                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
168                 http.StatusOK,
169                 "Received 1 pull requests\n",
170                 "unused",
171                 true,
172                 false,
173         }
174
175         performTest(testData, c)
176 }
177
178 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
179         defer teardown()
180
181         data_manager_token = "DATA MANAGER TOKEN"
182         testData := PullWorkerTestData{
183                 "TestPullWorker_error_on_get_two_locators",
184                 RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
185                 http.StatusOK,
186                 "Received 2 pull requests\n",
187                 "unused",
188                 true,
189                 false,
190         }
191
192         performTest(testData, c)
193 }
194
195 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
196         defer teardown()
197
198         data_manager_token = "DATA MANAGER TOKEN"
199         testData := PullWorkerTestData{
200                 "TestPullWorker_error_on_put_one_locator",
201                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
202                 http.StatusOK,
203                 "Received 1 pull requests\n",
204                 "unused",
205                 false,
206                 true,
207         }
208
209         performTest(testData, c)
210 }
211
212 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
213         defer teardown()
214
215         data_manager_token = "DATA MANAGER TOKEN"
216         testData := PullWorkerTestData{
217                 "TestPullWorker_error_on_put_two_locators",
218                 RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
219                 http.StatusOK,
220                 "Received 2 pull requests\n",
221                 "unused",
222                 false,
223                 true,
224         }
225
226         performTest(testData, c)
227 }
228
229 func performTest(testData PullWorkerTestData, c *C) {
230         testPullLists[testData.name] = testData.response_body
231
232         // We need to make sure the tests have a slight delay so that we can verify the pull list channel overwrites.
233         time.Sleep(25 * time.Millisecond)
234
235         // Override GetContent to mock keepclient functionality
236         GetContent = func(locator string, signedLocator string) (reader io.ReadCloser, contentLength int64, url string, err error) {
237                 if strings.HasPrefix(testData.name, "TestPullWorker_pull_list_with_one_locator_with_delay") {
238                         time.Sleep(100 * time.Millisecond)
239                 }
240
241                 processedPullLists[testData.name] = testData.response_body
242                 if testData.read_error {
243                         return nil, 0, "", errors.New("Error getting data")
244                 } else {
245                         cb := &ClosingBuffer{bytes.NewBufferString("Hi!")}
246                         var rc io.ReadCloser
247                         rc = cb
248                         return rc, 3, "", nil
249                 }
250         }
251
252         // Override PutContent to mock PutBlock functionality
253         PutContent = func(content []byte, locator string) (err error) {
254                 if testData.put_error {
255                         return errors.New("Error putting data")
256                 } else {
257                         return nil
258                 }
259         }
260
261         response := IssueRequest(&testData.req)
262         c.Assert(testData.response_code, Equals, response.Code)
263         c.Assert(testData.response_body, Equals, response.Body.String())
264 }
265
266 type ClosingBuffer struct {
267         *bytes.Buffer
268 }
269
270 func (cb *ClosingBuffer) Close() (err error) {
271         return
272 }
273
274 func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
275         select {
276         case item := <-workerChannel:
277                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
278         default:
279         }
280 }