Merge branch 'master' into 6859-fix-invalid-manifests
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         . "gopkg.in/check.v1"
9         "io"
10         "net/http"
11         "testing"
12         "time"
13 )
14
15 type PullWorkerTestSuite struct{}
16
17 // Gocheck boilerplate
18 func TestPullWorker(t *testing.T) {
19         TestingT(t)
20 }
21
22 // Gocheck boilerplate
23 var _ = Suite(&PullWorkerTestSuite{})
24
25 var testPullLists map[string]string
26 var readContent string
27 var readError error
28 var putContent []byte
29 var putError error
30 var currentTestData PullWorkerTestData
31
32 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
33         readContent = ""
34         readError = nil
35         putContent = []byte("")
36         putError = nil
37
38         // When a new pull request arrives, the old one will be overwritten.
39         // This behavior is verified using these two maps in the
40         // "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
41         testPullLists = make(map[string]string)
42 }
43
44 // Since keepstore does not come into picture in tests,
45 // we need to explicitly start the goroutine in tests.
46 func RunTestPullWorker(c *C) {
47         arv, err := arvadosclient.MakeArvadosClient()
48         c.Assert(err, Equals, nil)
49         keepClient, err := keepclient.MakeKeepClient(&arv)
50         c.Assert(err, Equals, nil)
51
52         pullq = NewWorkQueue()
53         go RunPullWorker(pullq, keepClient)
54 }
55
56 var first_pull_list = []byte(`[
57                 {
58                         "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
59                         "servers":[
60                                 "server_1",
61                                 "server_2"
62                         ]
63                 },{
64                         "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
65                         "servers":[
66                                 "server_3"
67                         ]
68                 }
69         ]`)
70
71 var second_pull_list = []byte(`[
72                 {
73                         "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
74                         "servers":[
75                                 "server_1",
76                                 "server_2"
77                         ]
78                 }
79         ]`)
80
81 type PullWorkerTestData struct {
82         name          string
83         req           RequestTester
84         response_code int
85         response_body string
86         read_content  string
87         read_error    bool
88         put_error     bool
89 }
90
91 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
92         defer teardown()
93
94         data_manager_token = "DATA MANAGER TOKEN"
95         testData := PullWorkerTestData{
96                 name:          "TestPullWorker_pull_list_with_two_locators",
97                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
98                 response_code: http.StatusOK,
99                 response_body: "Received 2 pull requests\n",
100                 read_content:  "hello",
101                 read_error:    false,
102                 put_error:     false,
103         }
104
105         performTest(testData, c)
106 }
107
108 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
109         defer teardown()
110
111         data_manager_token = "DATA MANAGER TOKEN"
112         testData := PullWorkerTestData{
113                 name:          "TestPullWorker_pull_list_with_one_locator",
114                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
115                 response_code: http.StatusOK,
116                 response_body: "Received 1 pull requests\n",
117                 read_content:  "hola",
118                 read_error:    false,
119                 put_error:     false,
120         }
121
122         performTest(testData, c)
123 }
124
125 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
126         defer teardown()
127
128         data_manager_token = "DATA MANAGER TOKEN"
129         testData := PullWorkerTestData{
130                 name:          "TestPullWorker_error_on_get_one_locator",
131                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
132                 response_code: http.StatusOK,
133                 response_body: "Received 1 pull requests\n",
134                 read_content:  "unused",
135                 read_error:    true,
136                 put_error:     false,
137         }
138
139         performTest(testData, c)
140 }
141
142 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
143         defer teardown()
144
145         data_manager_token = "DATA MANAGER TOKEN"
146         testData := PullWorkerTestData{
147                 name:          "TestPullWorker_error_on_get_two_locators",
148                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
149                 response_code: http.StatusOK,
150                 response_body: "Received 2 pull requests\n",
151                 read_content:  "unused",
152                 read_error:    true,
153                 put_error:     false,
154         }
155
156         performTest(testData, c)
157 }
158
159 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
160         defer teardown()
161
162         data_manager_token = "DATA MANAGER TOKEN"
163         testData := PullWorkerTestData{
164                 name:          "TestPullWorker_error_on_put_one_locator",
165                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
166                 response_code: http.StatusOK,
167                 response_body: "Received 1 pull requests\n",
168                 read_content:  "hello hello",
169                 read_error:    false,
170                 put_error:     true,
171         }
172
173         performTest(testData, c)
174 }
175
176 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
177         defer teardown()
178
179         data_manager_token = "DATA MANAGER TOKEN"
180         testData := PullWorkerTestData{
181                 name:          "TestPullWorker_error_on_put_two_locators",
182                 req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
183                 response_code: http.StatusOK,
184                 response_body: "Received 2 pull requests\n",
185                 read_content:  "hello again",
186                 read_error:    false,
187                 put_error:     true,
188         }
189
190         performTest(testData, c)
191 }
192
193 // When a new pull request arrives, the old one is replaced. This test
194 // is used to check that behavior by first putting an item on the queue,
195 // and then performing the test. Thus the "testPullLists" has two entries;
196 // however, processedPullLists will see only the newest item in the list.
197 func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_items_latest_replacing_old(c *C) {
198         defer teardown()
199
200         var firstInput = []int{1}
201         pullq = NewWorkQueue()
202         pullq.ReplaceQueue(makeTestWorkList(firstInput))
203         testPullLists["Added_before_actual_test_item"] = string(1)
204
205         data_manager_token = "DATA MANAGER TOKEN"
206         testData := PullWorkerTestData{
207                 name:          "TestPullWorker_pull_list_with_two_items_latest_replacing_old",
208                 req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
209                 response_code: http.StatusOK,
210                 response_body: "Received 1 pull requests\n",
211                 read_content:  "hola de nuevo",
212                 read_error:    false,
213                 put_error:     false,
214         }
215
216         performTest(testData, c)
217 }
218
219 // In this case, the item will not be placed on pullq
220 func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) {
221         defer teardown()
222
223         data_manager_token = "DATA MANAGER TOKEN"
224
225         testData := PullWorkerTestData{
226                 name:          "TestPullWorker_pull_list_with_two_locators",
227                 req:           RequestTester{"/pull", "invalid_data_manager_token", "PUT", first_pull_list},
228                 response_code: http.StatusUnauthorized,
229                 response_body: "Unauthorized\n",
230                 read_content:  "hello",
231                 read_error:    false,
232                 put_error:     false,
233         }
234
235         performTest(testData, c)
236 }
237
238 func performTest(testData PullWorkerTestData, c *C) {
239         KeepVM = MakeTestVolumeManager(2)
240         defer KeepVM.Close()
241
242         RunTestPullWorker(c)
243         defer pullq.Close()
244
245         currentTestData = testData
246         testPullLists[testData.name] = testData.response_body
247
248         processedPullLists := make(map[string]string)
249
250         // Override GetContent to mock keepclient Get functionality
251         defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
252                 GetContent = orig
253         }(GetContent)
254         GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
255                 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
256                 processedPullLists[testData.name] = testData.response_body
257                 if testData.read_error {
258                         err = errors.New("Error getting data")
259                         readError = err
260                         return nil, 0, "", err
261                 } else {
262                         readContent = testData.read_content
263                         cb := &ClosingBuffer{bytes.NewBufferString(testData.read_content)}
264                         var rc io.ReadCloser
265                         rc = cb
266                         return rc, int64(len(testData.read_content)), "", nil
267                 }
268         }
269
270         // Override PutContent to mock PutBlock functionality
271         defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
272         PutContent = func(content []byte, locator string) (err error) {
273                 if testData.put_error {
274                         err = errors.New("Error putting data")
275                         putError = err
276                         return err
277                 } else {
278                         putContent = content
279                         return nil
280                 }
281         }
282
283         c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
284         c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
285
286         response := IssueRequest(&testData.req)
287         c.Assert(response.Code, Equals, testData.response_code)
288         c.Assert(response.Body.String(), Equals, testData.response_body)
289
290         expectEqualWithin(c, time.Second, 0, func() interface{} {
291                 st := pullq.Status()
292                 return st.InProgress + st.Queued
293         })
294
295         if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
296                 c.Assert(len(testPullLists), Equals, 2)
297                 c.Assert(len(processedPullLists), Equals, 1)
298                 c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
299                 c.Assert(testPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
300                 c.Assert(processedPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
301         } else {
302                 if testData.response_code == http.StatusOK {
303                         c.Assert(len(testPullLists), Equals, 1)
304                         c.Assert(len(processedPullLists), Equals, 1)
305                         c.Assert(testPullLists[testData.name], NotNil)
306                 } else {
307                         c.Assert(len(testPullLists), Equals, 1)
308                         c.Assert(len(processedPullLists), Equals, 0)
309                 }
310         }
311
312         if testData.read_error {
313                 c.Assert(readError, NotNil)
314         } else if testData.response_code == http.StatusOK {
315                 c.Assert(readError, IsNil)
316                 c.Assert(readContent, Equals, testData.read_content)
317                 if testData.put_error {
318                         c.Assert(putError, NotNil)
319                 } else {
320                         c.Assert(putError, IsNil)
321                         c.Assert(string(putContent), Equals, testData.read_content)
322                 }
323         }
324
325         expectChannelEmpty(c, pullq.NextItem)
326 }
327
328 type ClosingBuffer struct {
329         *bytes.Buffer
330 }
331
332 func (cb *ClosingBuffer) Close() (err error) {
333         return
334 }