3761: improved tests with delays
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         . "gopkg.in/check.v1"
7         "io"
8         "net/http"
9         "strings"
10         "testing"
11         "time"
12 )
13
14 var testPullLists map[string]string
15 var processedPullLists map[string]string
16
17 type PullWorkerTestSuite struct{}
18
19 // Gocheck boilerplate
20 func Test(t *testing.T) {
21         TestingT(t)
22 }
23
24 // Gocheck boilerplate
25 var _ = Suite(&PullWorkerTestSuite{})
26
27 func (s *PullWorkerTestSuite) SetUpSuite(c *C) {
28         // Since keepstore does not come into picture in tests,
29         // we need to explicitly start the goroutine in tests.
30         go RunPullWorker(pullq.NextItem)
31
32         // When a new pull request arrives, the old one will be overwritten.
33         // This behavior is simulated with delay tests below.
34         testPullLists = make(map[string]string)
35         processedPullLists = make(map[string]string)
36 }
37
38 func (s *PullWorkerTestSuite) TearDownSuite(c *C) {
39         // give the channel enough time to read and process all pull list entries
40         time.Sleep(1000 * time.Millisecond)
41
42         expectWorkerChannelEmpty(c, pullq.NextItem)
43
44         c.Assert(len(processedPullLists), Not(Equals), len(testPullLists))
45 }
46
47 var first_pull_list = []byte(`[
48                 {
49                         "locator":"locator1",
50                         "servers":[
51                                 "server_1",
52                                 "server_2"
53                         ]
54                 },
55     {
56                         "locator":"locator2",
57                         "servers":[
58                                 "server_3"
59                         ]
60                 }
61         ]`)
62
63 var second_pull_list = []byte(`[
64                 {
65                         "locator":"locator3",
66                         "servers":[
67                                 "server_1",
68         "server_2"
69                         ]
70                 }
71         ]`)
72
73 type PullWorkerTestData struct {
74         name          string
75         req           RequestTester
76         response_code int
77         response_body string
78         read_content  string
79         read_error    bool
80         put_error     bool
81 }
82
83 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
84         defer teardown()
85
86         data_manager_token = "DATA MANAGER TOKEN"
87         testData := PullWorkerTestData{
88                 "TestPullWorker_pull_list_with_two_locators",
89                 RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
90                 http.StatusOK,
91                 "Received 2 pull requests\n",
92                 "hello",
93                 false,
94                 false,
95         }
96
97         performTest(testData, c)
98 }
99
100 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
101         defer teardown()
102
103         data_manager_token = "DATA MANAGER TOKEN"
104         testData := PullWorkerTestData{
105                 "TestPullWorker_pull_list_with_one_locator",
106                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
107                 http.StatusOK,
108                 "Received 1 pull requests\n",
109                 "hola",
110                 false,
111                 false,
112         }
113
114         performTest(testData, c)
115 }
116
117 // When a new pull request arrives, the old one will be overwritten.
118 // Simulate this behavior by inducing delay in GetContent for the delay test(s).
119 // To ensure this delay test is not the last one executed and
120 // hence we cannot verify this behavior, let's run the delay test twice.
121 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator_with_delay_1(c *C) {
122         defer teardown()
123
124         data_manager_token = "DATA MANAGER TOKEN"
125         testData := PullWorkerTestData{
126                 "TestPullWorker_pull_list_with_one_locator_with_delay_1",
127                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
128                 http.StatusOK,
129                 "Received 1 pull requests\n",
130                 "hola",
131                 false,
132                 false,
133         }
134
135         performTest(testData, c)
136 }
137
138 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator_with_delay_2(c *C) {
139         defer teardown()
140
141         data_manager_token = "DATA MANAGER TOKEN"
142         testData := PullWorkerTestData{
143                 "TestPullWorker_pull_list_with_one_locator_with_delay_2",
144                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
145                 http.StatusOK,
146                 "Received 1 pull requests\n",
147                 "hola",
148                 false,
149                 false,
150         }
151
152         performTest(testData, c)
153 }
154
155 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
156         defer teardown()
157
158         data_manager_token = "DATA MANAGER TOKEN"
159         testData := PullWorkerTestData{
160                 "TestPullWorker_error_on_get_one_locator",
161                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
162                 http.StatusOK,
163                 "Received 1 pull requests\n",
164                 "unused",
165                 true,
166                 false,
167         }
168
169         performTest(testData, c)
170 }
171
172 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
173         defer teardown()
174
175         data_manager_token = "DATA MANAGER TOKEN"
176         testData := PullWorkerTestData{
177                 "TestPullWorker_error_on_get_two_locators",
178                 RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
179                 http.StatusOK,
180                 "Received 2 pull requests\n",
181                 "unused",
182                 true,
183                 false,
184         }
185
186         performTest(testData, c)
187 }
188
189 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
190         defer teardown()
191
192         data_manager_token = "DATA MANAGER TOKEN"
193         testData := PullWorkerTestData{
194                 "TestPullWorker_error_on_put_one_locator",
195                 RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
196                 http.StatusOK,
197                 "Received 1 pull requests\n",
198                 "unused",
199                 false,
200                 true,
201         }
202
203         performTest(testData, c)
204 }
205
206 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
207         defer teardown()
208
209         data_manager_token = "DATA MANAGER TOKEN"
210         testData := PullWorkerTestData{
211                 "TestPullWorker_error_on_put_two_locators",
212                 RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
213                 http.StatusOK,
214                 "Received 2 pull requests\n",
215                 "unused",
216                 false,
217                 true,
218         }
219
220         performTest(testData, c)
221 }
222
223 func performTest(testData PullWorkerTestData, c *C) {
224         testPullLists[testData.name] = testData.response_body
225
226         // We need to make sure the tests have a slight delay so that we can verify the pull list channel overwrites.
227         time.Sleep(25 * time.Millisecond)
228
229         // Override GetContent to mock keepclient functionality
230         GetContent = func(locator string, signedLocator string) (reader io.ReadCloser, contentLength int64, url string, err error) {
231                 if strings.HasPrefix(testData.name, "TestPullWorker_pull_list_with_one_locator_with_delay") {
232                         time.Sleep(100 * time.Millisecond)
233                 }
234
235                 processedPullLists[testData.name] = testData.response_body
236                 if testData.read_error {
237                         return nil, 0, "", errors.New("Error getting data")
238                 } else {
239                         cb := &ClosingBuffer{bytes.NewBufferString("Hi!")}
240                         var rc io.ReadCloser
241                         rc = cb
242                         return rc, 3, "", nil
243                 }
244         }
245
246         // Override PutContent to mock PutBlock functionality
247         PutContent = func(content []byte, locator string) (err error) {
248                 if testData.put_error {
249                         return errors.New("Error putting data")
250                 } else {
251                         return nil
252                 }
253         }
254
255         response := IssueRequest(&testData.req)
256         c.Assert(testData.response_code, Equals, response.Code)
257         c.Assert(testData.response_body, Equals, response.Body.String())
258 }
259
260 type ClosingBuffer struct {
261         *bytes.Buffer
262 }
263
264 func (cb *ClosingBuffer) Close() (err error) {
265         return
266 }
267
268 func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
269         select {
270         case item := <-workerChannel:
271                 c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
272         default:
273         }
274 }