refactor as procedural
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         . "gopkg.in/check.v1"
9         "io"
10         "net/http"
11         "time"
12 )
13
14 var _ = Suite(&PullWorkerTestSuite{})
15
16 type PullWorkerTestSuite struct{}
17
18 var testPullLists map[string]string
19 var readContent string
20 var readError error
21 var putContent []byte
22 var putError error
23 var currentTestData PullWorkerTestData
24
25 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
26         readContent = ""
27         readError = nil
28         putContent = []byte("")
29         putError = nil
30
31         // When a new pull request arrives, the old one will be overwritten.
32         // This behavior is verified using these two maps in the
33         // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
34         testPullLists = make(map[string]string)
35 }
36
37 // Since keepstore does not come into picture in tests,
38 // we need to explicitly start the goroutine in tests.
39 func RunTestPullWorker(c *C) {
40         arv, err := arvadosclient.MakeArvadosClient()
41         c.Assert(err, Equals, nil)
42         keepClient, err := keepclient.MakeKeepClient(arv)
43         c.Assert(err, Equals, nil)
44
45         pullq = NewWorkQueue()
46         go RunPullWorker(pullq, keepClient)
47 }
48
49 var firstPullList = []byte(`[
50                 {
51                         "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
52                         "servers":[
53                                 "server_1",
54                                 "server_2"
55                         ]
56                 },{
57                         "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
58                         "servers":[
59                                 "server_3"
60                         ]
61                 }
62         ]`)
63
64 var secondPullList = []byte(`[
65                 {
66                         "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
67                         "servers":[
68                                 "server_1",
69                                 "server_2"
70                         ]
71                 }
72         ]`)
73
74 type PullWorkerTestData struct {
75         name         string
76         req          RequestTester
77         responseCode int
78         responseBody string
79         readContent  string
80         readError    bool
81         putError     bool
82 }
83
84 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
85         defer teardown()
86
87         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
88         testData := PullWorkerTestData{
89                 name:         "TestPullWorkerPullList_with_two_locators",
90                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
91                 responseCode: http.StatusOK,
92                 responseBody: "Received 2 pull requests\n",
93                 readContent:  "hello",
94                 readError:    false,
95                 putError:     false,
96         }
97
98         performTest(testData, c)
99 }
100
101 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
102         defer teardown()
103
104         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
105         testData := PullWorkerTestData{
106                 name:         "TestPullWorkerPullList_with_one_locator",
107                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
108                 responseCode: http.StatusOK,
109                 responseBody: "Received 1 pull requests\n",
110                 readContent:  "hola",
111                 readError:    false,
112                 putError:     false,
113         }
114
115         performTest(testData, c)
116 }
117
118 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
119         defer teardown()
120
121         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
122         testData := PullWorkerTestData{
123                 name:         "TestPullWorker_error_on_get_one_locator",
124                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
125                 responseCode: http.StatusOK,
126                 responseBody: "Received 1 pull requests\n",
127                 readContent:  "unused",
128                 readError:    true,
129                 putError:     false,
130         }
131
132         performTest(testData, c)
133 }
134
135 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
136         defer teardown()
137
138         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
139         testData := PullWorkerTestData{
140                 name:         "TestPullWorker_error_on_get_two_locators",
141                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
142                 responseCode: http.StatusOK,
143                 responseBody: "Received 2 pull requests\n",
144                 readContent:  "unused",
145                 readError:    true,
146                 putError:     false,
147         }
148
149         performTest(testData, c)
150 }
151
152 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
153         defer teardown()
154
155         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
156         testData := PullWorkerTestData{
157                 name:         "TestPullWorker_error_on_put_one_locator",
158                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
159                 responseCode: http.StatusOK,
160                 responseBody: "Received 1 pull requests\n",
161                 readContent:  "hello hello",
162                 readError:    false,
163                 putError:     true,
164         }
165
166         performTest(testData, c)
167 }
168
169 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
170         defer teardown()
171
172         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
173         testData := PullWorkerTestData{
174                 name:         "TestPullWorker_error_on_put_two_locators",
175                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
176                 responseCode: http.StatusOK,
177                 responseBody: "Received 2 pull requests\n",
178                 readContent:  "hello again",
179                 readError:    false,
180                 putError:     true,
181         }
182
183         performTest(testData, c)
184 }
185
186 // When a new pull request arrives, the old one is replaced. This test
187 // is used to check that behavior by first putting an item on the queue,
188 // and then performing the test. Thus the "testPullLists" has two entries;
189 // however, processedPullLists will see only the newest item in the list.
190 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_replacing_old(c *C) {
191         defer teardown()
192
193         var firstInput = []int{1}
194         pullq = NewWorkQueue()
195         pullq.ReplaceQueue(makeTestWorkList(firstInput))
196         testPullLists["Added_before_actual_test_item"] = string(1)
197
198         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
199         testData := PullWorkerTestData{
200                 name:         "TestPullWorkerPullList_with_two_items_latest_replacing_old",
201                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
202                 responseCode: http.StatusOK,
203                 responseBody: "Received 1 pull requests\n",
204                 readContent:  "hola de nuevo",
205                 readError:    false,
206                 putError:     false,
207         }
208
209         performTest(testData, c)
210 }
211
212 // In this case, the item will not be placed on pullq
213 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
214         defer teardown()
215
216         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
217
218         testData := PullWorkerTestData{
219                 name:         "TestPullWorkerPullList_with_two_locators",
220                 req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
221                 responseCode: http.StatusUnauthorized,
222                 responseBody: "Unauthorized\n",
223                 readContent:  "hello",
224                 readError:    false,
225                 putError:     false,
226         }
227
228         performTest(testData, c)
229 }
230
231 func performTest(testData PullWorkerTestData, c *C) {
232         KeepVM = MakeTestVolumeManager(2)
233         defer KeepVM.Close()
234
235         RunTestPullWorker(c)
236         defer pullq.Close()
237
238         currentTestData = testData
239         testPullLists[testData.name] = testData.responseBody
240
241         processedPullLists := make(map[string]string)
242
243         // Override GetContent to mock keepclient Get functionality
244         defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
245                 GetContent = orig
246         }(GetContent)
247         GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
248                 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
249                 processedPullLists[testData.name] = testData.responseBody
250                 if testData.readError {
251                         err = errors.New("Error getting data")
252                         readError = err
253                         return nil, 0, "", err
254                 }
255                 readContent = testData.readContent
256                 cb := &ClosingBuffer{bytes.NewBufferString(testData.readContent)}
257                 var rc io.ReadCloser
258                 rc = cb
259                 return rc, int64(len(testData.readContent)), "", nil
260         }
261
262         // Override PutContent to mock PutBlock functionality
263         defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
264         PutContent = func(content []byte, locator string) (err error) {
265                 if testData.putError {
266                         err = errors.New("Error putting data")
267                         putError = err
268                         return err
269                 }
270                 putContent = content
271                 return nil
272         }
273
274         c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
275         c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
276
277         response := IssueRequest(&testData.req)
278         c.Assert(response.Code, Equals, testData.responseCode)
279         c.Assert(response.Body.String(), Equals, testData.responseBody)
280
281         expectEqualWithin(c, time.Second, 0, func() interface{} {
282                 st := pullq.Status()
283                 return st.InProgress + st.Queued
284         })
285
286         if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
287                 c.Assert(len(testPullLists), Equals, 2)
288                 c.Assert(len(processedPullLists), Equals, 1)
289                 c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
290                 c.Assert(testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
291                 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
292         } else {
293                 if testData.responseCode == http.StatusOK {
294                         c.Assert(len(testPullLists), Equals, 1)
295                         c.Assert(len(processedPullLists), Equals, 1)
296                         c.Assert(testPullLists[testData.name], NotNil)
297                 } else {
298                         c.Assert(len(testPullLists), Equals, 1)
299                         c.Assert(len(processedPullLists), Equals, 0)
300                 }
301         }
302
303         if testData.readError {
304                 c.Assert(readError, NotNil)
305         } else if testData.responseCode == http.StatusOK {
306                 c.Assert(readError, IsNil)
307                 c.Assert(readContent, Equals, testData.readContent)
308                 if testData.putError {
309                         c.Assert(putError, NotNil)
310                 } else {
311                         c.Assert(putError, IsNil)
312                         c.Assert(string(putContent), Equals, testData.readContent)
313                 }
314         }
315
316         expectChannelEmpty(c, pullq.NextItem)
317 }
318
319 type ClosingBuffer struct {
320         *bytes.Buffer
321 }
322
323 func (cb *ClosingBuffer) Close() (err error) {
324         return
325 }