Merge branch '8784-dir-listings'
[arvados.git] / services / keepstore / pull_worker_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "errors"
10         "io"
11         "io/ioutil"
12         "net/http"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
16         "git.curoverse.com/arvados.git/sdk/go/keepclient"
17         . "gopkg.in/check.v1"
18 )
19
20 var _ = Suite(&PullWorkerTestSuite{})
21
22 type PullWorkerTestSuite struct {
23         testPullLists map[string]string
24         readContent   string
25         readError     error
26         putContent    []byte
27         putError      error
28 }
29
30 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
31         theConfig.systemAuthToken = "arbitrary data manager token"
32         s.readContent = ""
33         s.readError = nil
34         s.putContent = []byte{}
35         s.putError = nil
36
37         // When a new pull request arrives, the old one will be overwritten.
38         // This behavior is verified using these two maps in the
39         // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
40         s.testPullLists = make(map[string]string)
41
42         KeepVM = MakeTestVolumeManager(2)
43
44         // Normally the pull queue and workers are started by main()
45         // -- tests need to set up their own.
46         arv, err := arvadosclient.MakeArvadosClient()
47         c.Assert(err, IsNil)
48         keepClient, err := keepclient.MakeKeepClient(arv)
49         c.Assert(err, IsNil)
50         pullq = NewWorkQueue()
51         go RunPullWorker(pullq, keepClient)
52 }
53
54 func (s *PullWorkerTestSuite) TearDownTest(c *C) {
55         KeepVM.Close()
56         KeepVM = nil
57         pullq.Close()
58         pullq = nil
59         teardown()
60         theConfig = DefaultConfig()
61         theConfig.Start()
62 }
63
64 var firstPullList = []byte(`[
65                 {
66                         "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
67                         "servers":[
68                                 "server_1",
69                                 "server_2"
70                         ]
71                 },{
72                         "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
73                         "servers":[
74                                 "server_3"
75                         ]
76                 }
77         ]`)
78
79 var secondPullList = []byte(`[
80                 {
81                         "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
82                         "servers":[
83                                 "server_1",
84                                 "server_2"
85                         ]
86                 }
87         ]`)
88
89 type PullWorkerTestData struct {
90         name         string
91         req          RequestTester
92         responseCode int
93         responseBody string
94         readContent  string
95         readError    bool
96         putError     bool
97 }
98
99 // Ensure MountUUID in a pull list is correctly translated to a Volume
100 // argument passed to writePulledBlock().
101 func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
102         defer func(f func(Volume, []byte, string)) {
103                 writePulledBlock = f
104         }(writePulledBlock)
105
106         for _, spec := range []struct {
107                 sendUUID     string
108                 expectVolume Volume
109         }{
110                 {
111                         sendUUID:     "",
112                         expectVolume: nil,
113                 },
114                 {
115                         sendUUID:     KeepVM.Mounts()[0].UUID,
116                         expectVolume: KeepVM.Mounts()[0].volume,
117                 },
118         } {
119                 writePulledBlock = func(v Volume, _ []byte, _ string) {
120                         c.Check(v, Equals, spec.expectVolume)
121                 }
122
123                 resp := IssueRequest(&RequestTester{
124                         uri:      "/pull",
125                         apiToken: theConfig.systemAuthToken,
126                         method:   "PUT",
127                         requestBody: []byte(`[{
128                                 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
129                                 "servers":["server_1","server_2"],
130                                 "mountuuid":"` + spec.sendUUID + `"}]`),
131                 })
132                 c.Assert(resp.Code, Equals, http.StatusOK)
133                 expectEqualWithin(c, time.Second, 0, func() interface{} {
134                         st := pullq.Status()
135                         return st.InProgress + st.Queued
136                 })
137         }
138 }
139
140 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
141         testData := PullWorkerTestData{
142                 name:         "TestPullWorkerPullList_with_two_locators",
143                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
144                 responseCode: http.StatusOK,
145                 responseBody: "Received 2 pull requests\n",
146                 readContent:  "hello",
147                 readError:    false,
148                 putError:     false,
149         }
150
151         s.performTest(testData, c)
152 }
153
154 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
155         testData := PullWorkerTestData{
156                 name:         "TestPullWorkerPullList_with_one_locator",
157                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
158                 responseCode: http.StatusOK,
159                 responseBody: "Received 1 pull requests\n",
160                 readContent:  "hola",
161                 readError:    false,
162                 putError:     false,
163         }
164
165         s.performTest(testData, c)
166 }
167
168 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
169         testData := PullWorkerTestData{
170                 name:         "TestPullWorker_error_on_get_one_locator",
171                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
172                 responseCode: http.StatusOK,
173                 responseBody: "Received 1 pull requests\n",
174                 readContent:  "unused",
175                 readError:    true,
176                 putError:     false,
177         }
178
179         s.performTest(testData, c)
180 }
181
182 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
183         testData := PullWorkerTestData{
184                 name:         "TestPullWorker_error_on_get_two_locators",
185                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
186                 responseCode: http.StatusOK,
187                 responseBody: "Received 2 pull requests\n",
188                 readContent:  "unused",
189                 readError:    true,
190                 putError:     false,
191         }
192
193         s.performTest(testData, c)
194 }
195
196 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
197         testData := PullWorkerTestData{
198                 name:         "TestPullWorker_error_on_put_one_locator",
199                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
200                 responseCode: http.StatusOK,
201                 responseBody: "Received 1 pull requests\n",
202                 readContent:  "hello hello",
203                 readError:    false,
204                 putError:     true,
205         }
206
207         s.performTest(testData, c)
208 }
209
210 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
211         testData := PullWorkerTestData{
212                 name:         "TestPullWorker_error_on_put_two_locators",
213                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
214                 responseCode: http.StatusOK,
215                 responseBody: "Received 2 pull requests\n",
216                 readContent:  "hello again",
217                 readError:    false,
218                 putError:     true,
219         }
220
221         s.performTest(testData, c)
222 }
223
224 // In this case, the item will not be placed on pullq
225 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
226         testData := PullWorkerTestData{
227                 name:         "TestPullWorkerPullList_with_two_locators",
228                 req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
229                 responseCode: http.StatusUnauthorized,
230                 responseBody: "Unauthorized\n",
231                 readContent:  "hello",
232                 readError:    false,
233                 putError:     false,
234         }
235
236         s.performTest(testData, c)
237 }
238
239 func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
240         s.testPullLists[testData.name] = testData.responseBody
241
242         processedPullLists := make(map[string]string)
243
244         // Override GetContent to mock keepclient Get functionality
245         defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
246                 GetContent = orig
247         }(GetContent)
248         GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
249                 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
250                 processedPullLists[testData.name] = testData.responseBody
251                 if testData.readError {
252                         err = errors.New("Error getting data")
253                         s.readError = err
254                         return
255                 }
256                 s.readContent = testData.readContent
257                 reader = ioutil.NopCloser(bytes.NewBufferString(testData.readContent))
258                 contentLength = int64(len(testData.readContent))
259                 return
260         }
261
262         // Override writePulledBlock to mock PutBlock functionality
263         defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
264         writePulledBlock = func(v Volume, content []byte, locator string) {
265                 if testData.putError {
266                         s.putError = errors.New("Error putting data")
267                         return
268                 }
269                 s.putContent = content
270         }
271
272         c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
273         c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
274
275         response := IssueRequest(&testData.req)
276         c.Assert(response.Code, Equals, testData.responseCode)
277         c.Assert(response.Body.String(), Equals, testData.responseBody)
278
279         expectEqualWithin(c, time.Second, 0, func() interface{} {
280                 st := pullq.Status()
281                 return st.InProgress + st.Queued
282         })
283
284         if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
285                 c.Assert(len(s.testPullLists), Equals, 2)
286                 c.Assert(len(processedPullLists), Equals, 1)
287                 c.Assert(s.testPullLists["Added_before_actual_test_item"], NotNil)
288                 c.Assert(s.testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
289                 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
290         } else {
291                 if testData.responseCode == http.StatusOK {
292                         c.Assert(len(s.testPullLists), Equals, 1)
293                         c.Assert(len(processedPullLists), Equals, 1)
294                         c.Assert(s.testPullLists[testData.name], NotNil)
295                 } else {
296                         c.Assert(len(s.testPullLists), Equals, 1)
297                         c.Assert(len(processedPullLists), Equals, 0)
298                 }
299         }
300
301         if testData.readError {
302                 c.Assert(s.readError, NotNil)
303         } else if testData.responseCode == http.StatusOK {
304                 c.Assert(s.readError, IsNil)
305                 c.Assert(s.readContent, Equals, testData.readContent)
306                 if testData.putError {
307                         c.Assert(s.putError, NotNil)
308                 } else {
309                         c.Assert(s.putError, IsNil)
310                         c.Assert(string(s.putContent), Equals, testData.readContent)
311                 }
312         }
313
314         expectChannelEmpty(c, pullq.NextItem)
315 }