11644: Accept index/pull/trash requests for a specific mount.
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "io"
7         "net/http"
8         "time"
9
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "git.curoverse.com/arvados.git/sdk/go/keepclient"
12         . "gopkg.in/check.v1"
13 )
14
15 var _ = Suite(&PullWorkerTestSuite{})
16
17 type PullWorkerTestSuite struct{}
18
19 var testPullLists map[string]string
20 var readContent string
21 var readError error
22 var putContent []byte
23 var putError error
24 var currentTestData PullWorkerTestData
25
26 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
27         readContent = ""
28         readError = nil
29         putContent = []byte("")
30         putError = nil
31
32         // When a new pull request arrives, the old one will be overwritten.
33         // This behavior is verified using these two maps in the
34         // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
35         testPullLists = make(map[string]string)
36 }
37
38 // Since keepstore does not come into picture in tests,
39 // we need to explicitly start the goroutine in tests.
40 func RunTestPullWorker(c *C) {
41         arv, err := arvadosclient.MakeArvadosClient()
42         c.Assert(err, Equals, nil)
43         keepClient, err := keepclient.MakeKeepClient(arv)
44         c.Assert(err, Equals, nil)
45
46         pullq = NewWorkQueue()
47         go RunPullWorker(pullq, keepClient)
48 }
49
50 var firstPullList = []byte(`[
51                 {
52                         "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
53                         "servers":[
54                                 "server_1",
55                                 "server_2"
56                         ]
57                 },{
58                         "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
59                         "servers":[
60                                 "server_3"
61                         ]
62                 }
63         ]`)
64
65 var secondPullList = []byte(`[
66                 {
67                         "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
68                         "servers":[
69                                 "server_1",
70                                 "server_2"
71                         ]
72                 }
73         ]`)
74
75 type PullWorkerTestData struct {
76         name         string
77         req          RequestTester
78         responseCode int
79         responseBody string
80         readContent  string
81         readError    bool
82         putError     bool
83 }
84
85 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
86         defer teardown()
87
88         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
89         testData := PullWorkerTestData{
90                 name:         "TestPullWorkerPullList_with_two_locators",
91                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
92                 responseCode: http.StatusOK,
93                 responseBody: "Received 2 pull requests\n",
94                 readContent:  "hello",
95                 readError:    false,
96                 putError:     false,
97         }
98
99         performTest(testData, c)
100 }
101
102 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
103         defer teardown()
104
105         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
106         testData := PullWorkerTestData{
107                 name:         "TestPullWorkerPullList_with_one_locator",
108                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
109                 responseCode: http.StatusOK,
110                 responseBody: "Received 1 pull requests\n",
111                 readContent:  "hola",
112                 readError:    false,
113                 putError:     false,
114         }
115
116         performTest(testData, c)
117 }
118
119 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
120         defer teardown()
121
122         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
123         testData := PullWorkerTestData{
124                 name:         "TestPullWorker_error_on_get_one_locator",
125                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
126                 responseCode: http.StatusOK,
127                 responseBody: "Received 1 pull requests\n",
128                 readContent:  "unused",
129                 readError:    true,
130                 putError:     false,
131         }
132
133         performTest(testData, c)
134 }
135
136 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
137         defer teardown()
138
139         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
140         testData := PullWorkerTestData{
141                 name:         "TestPullWorker_error_on_get_two_locators",
142                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
143                 responseCode: http.StatusOK,
144                 responseBody: "Received 2 pull requests\n",
145                 readContent:  "unused",
146                 readError:    true,
147                 putError:     false,
148         }
149
150         performTest(testData, c)
151 }
152
153 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
154         defer teardown()
155
156         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
157         testData := PullWorkerTestData{
158                 name:         "TestPullWorker_error_on_put_one_locator",
159                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
160                 responseCode: http.StatusOK,
161                 responseBody: "Received 1 pull requests\n",
162                 readContent:  "hello hello",
163                 readError:    false,
164                 putError:     true,
165         }
166
167         performTest(testData, c)
168 }
169
170 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
171         defer teardown()
172
173         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
174         testData := PullWorkerTestData{
175                 name:         "TestPullWorker_error_on_put_two_locators",
176                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
177                 responseCode: http.StatusOK,
178                 responseBody: "Received 2 pull requests\n",
179                 readContent:  "hello again",
180                 readError:    false,
181                 putError:     true,
182         }
183
184         performTest(testData, c)
185 }
186
187 // When a new pull request arrives, the old one is replaced. This test
188 // is used to check that behavior by first putting an item on the queue,
189 // and then performing the test. Thus the "testPullLists" has two entries;
190 // however, processedPullLists will see only the newest item in the list.
191 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_replacing_old(c *C) {
192         defer teardown()
193
194         var firstInput = []int{1}
195         pullq = NewWorkQueue()
196         pullq.ReplaceQueue(makeTestWorkList(firstInput))
197         testPullLists["Added_before_actual_test_item"] = string(1)
198
199         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
200         testData := PullWorkerTestData{
201                 name:         "TestPullWorkerPullList_with_two_items_latest_replacing_old",
202                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
203                 responseCode: http.StatusOK,
204                 responseBody: "Received 1 pull requests\n",
205                 readContent:  "hola de nuevo",
206                 readError:    false,
207                 putError:     false,
208         }
209
210         performTest(testData, c)
211 }
212
213 // In this case, the item will not be placed on pullq
214 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
215         defer teardown()
216
217         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
218
219         testData := PullWorkerTestData{
220                 name:         "TestPullWorkerPullList_with_two_locators",
221                 req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
222                 responseCode: http.StatusUnauthorized,
223                 responseBody: "Unauthorized\n",
224                 readContent:  "hello",
225                 readError:    false,
226                 putError:     false,
227         }
228
229         performTest(testData, c)
230 }
231
232 func performTest(testData PullWorkerTestData, c *C) {
233         KeepVM = MakeTestVolumeManager(2)
234         defer KeepVM.Close()
235
236         RunTestPullWorker(c)
237         defer pullq.Close()
238
239         currentTestData = testData
240         testPullLists[testData.name] = testData.responseBody
241
242         processedPullLists := make(map[string]string)
243
244         // Override GetContent to mock keepclient Get functionality
245         defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
246                 GetContent = orig
247         }(GetContent)
248         GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
249                 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
250                 processedPullLists[testData.name] = testData.responseBody
251                 if testData.readError {
252                         err = errors.New("Error getting data")
253                         readError = err
254                         return nil, 0, "", err
255                 }
256                 readContent = testData.readContent
257                 cb := &ClosingBuffer{bytes.NewBufferString(testData.readContent)}
258                 var rc io.ReadCloser
259                 rc = cb
260                 return rc, int64(len(testData.readContent)), "", nil
261         }
262
263         // Override writePulledBlock to mock PutBlock functionality
264         defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
265         writePulledBlock = func(v Volume, content []byte, locator string) {
266                 if testData.putError {
267                         putError = errors.New("Error putting data")
268                         return
269                 }
270                 putContent = content
271         }
272
273         c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
274         c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
275
276         response := IssueRequest(&testData.req)
277         c.Assert(response.Code, Equals, testData.responseCode)
278         c.Assert(response.Body.String(), Equals, testData.responseBody)
279
280         expectEqualWithin(c, time.Second, 0, func() interface{} {
281                 st := pullq.Status()
282                 return st.InProgress + st.Queued
283         })
284
285         if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
286                 c.Assert(len(testPullLists), Equals, 2)
287                 c.Assert(len(processedPullLists), Equals, 1)
288                 c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
289                 c.Assert(testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
290                 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
291         } else {
292                 if testData.responseCode == http.StatusOK {
293                         c.Assert(len(testPullLists), Equals, 1)
294                         c.Assert(len(processedPullLists), Equals, 1)
295                         c.Assert(testPullLists[testData.name], NotNil)
296                 } else {
297                         c.Assert(len(testPullLists), Equals, 1)
298                         c.Assert(len(processedPullLists), Equals, 0)
299                 }
300         }
301
302         if testData.readError {
303                 c.Assert(readError, NotNil)
304         } else if testData.responseCode == http.StatusOK {
305                 c.Assert(readError, IsNil)
306                 c.Assert(readContent, Equals, testData.readContent)
307                 if testData.putError {
308                         c.Assert(putError, NotNil)
309                 } else {
310                         c.Assert(putError, IsNil)
311                         c.Assert(string(putContent), Equals, testData.readContent)
312                 }
313         }
314
315         expectChannelEmpty(c, pullq.NextItem)
316 }
317
318 type ClosingBuffer struct {
319         *bytes.Buffer
320 }
321
322 func (cb *ClosingBuffer) Close() (err error) {
323         return
324 }