Merge branch '13593-async-perm-graph-update'
[arvados.git] / services / keepstore / pull_worker_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "errors"
10         "io"
11         "io/ioutil"
12         "net/http"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
16         "git.curoverse.com/arvados.git/sdk/go/keepclient"
17         "github.com/prometheus/client_golang/prometheus"
18         . "gopkg.in/check.v1"
19 )
20
21 var _ = Suite(&PullWorkerTestSuite{})
22
23 type PullWorkerTestSuite struct {
24         testPullLists map[string]string
25         readContent   string
26         readError     error
27         putContent    []byte
28         putError      error
29 }
30
31 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
32         theConfig.systemAuthToken = "arbitrary data manager token"
33         s.readContent = ""
34         s.readError = nil
35         s.putContent = []byte{}
36         s.putError = nil
37
38         // When a new pull request arrives, the old one will be overwritten.
39         // This behavior is verified using these two maps in the
40         // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
41         s.testPullLists = make(map[string]string)
42
43         KeepVM = MakeTestVolumeManager(2)
44
45         // Normally the pull queue and workers are started by main()
46         // -- tests need to set up their own.
47         arv, err := arvadosclient.MakeArvadosClient()
48         c.Assert(err, IsNil)
49         keepClient, err := keepclient.MakeKeepClient(arv)
50         c.Assert(err, IsNil)
51         pullq = NewWorkQueue()
52         go RunPullWorker(pullq, keepClient)
53 }
54
55 func (s *PullWorkerTestSuite) TearDownTest(c *C) {
56         KeepVM.Close()
57         KeepVM = nil
58         pullq.Close()
59         pullq = nil
60         teardown()
61         theConfig = DefaultConfig()
62         theConfig.Start(prometheus.NewRegistry())
63 }
64
65 var firstPullList = []byte(`[
66                 {
67                         "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
68                         "servers":[
69                                 "server_1",
70                                 "server_2"
71                         ]
72                 },{
73                         "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
74                         "servers":[
75                                 "server_3"
76                         ]
77                 }
78         ]`)
79
80 var secondPullList = []byte(`[
81                 {
82                         "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
83                         "servers":[
84                                 "server_1",
85                                 "server_2"
86                         ]
87                 }
88         ]`)
89
90 type PullWorkerTestData struct {
91         name         string
92         req          RequestTester
93         responseCode int
94         responseBody string
95         readContent  string
96         readError    bool
97         putError     bool
98 }
99
100 // Ensure MountUUID in a pull list is correctly translated to a Volume
101 // argument passed to writePulledBlock().
102 func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
103         defer func(f func(Volume, []byte, string)) {
104                 writePulledBlock = f
105         }(writePulledBlock)
106
107         for _, spec := range []struct {
108                 sendUUID     string
109                 expectVolume Volume
110         }{
111                 {
112                         sendUUID:     "",
113                         expectVolume: nil,
114                 },
115                 {
116                         sendUUID:     KeepVM.Mounts()[0].UUID,
117                         expectVolume: KeepVM.Mounts()[0].volume,
118                 },
119         } {
120                 writePulledBlock = func(v Volume, _ []byte, _ string) {
121                         c.Check(v, Equals, spec.expectVolume)
122                 }
123
124                 resp := IssueRequest(&RequestTester{
125                         uri:      "/pull",
126                         apiToken: theConfig.systemAuthToken,
127                         method:   "PUT",
128                         requestBody: []byte(`[{
129                                 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
130                                 "servers":["server_1","server_2"],
131                                 "mount_uuid":"` + spec.sendUUID + `"}]`),
132                 })
133                 c.Assert(resp.Code, Equals, http.StatusOK)
134                 expectEqualWithin(c, time.Second, 0, func() interface{} {
135                         st := pullq.Status()
136                         return st.InProgress + st.Queued
137                 })
138         }
139 }
140
141 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
142         testData := PullWorkerTestData{
143                 name:         "TestPullWorkerPullList_with_two_locators",
144                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
145                 responseCode: http.StatusOK,
146                 responseBody: "Received 2 pull requests\n",
147                 readContent:  "hello",
148                 readError:    false,
149                 putError:     false,
150         }
151
152         s.performTest(testData, c)
153 }
154
155 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
156         testData := PullWorkerTestData{
157                 name:         "TestPullWorkerPullList_with_one_locator",
158                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
159                 responseCode: http.StatusOK,
160                 responseBody: "Received 1 pull requests\n",
161                 readContent:  "hola",
162                 readError:    false,
163                 putError:     false,
164         }
165
166         s.performTest(testData, c)
167 }
168
169 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
170         testData := PullWorkerTestData{
171                 name:         "TestPullWorker_error_on_get_one_locator",
172                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
173                 responseCode: http.StatusOK,
174                 responseBody: "Received 1 pull requests\n",
175                 readContent:  "unused",
176                 readError:    true,
177                 putError:     false,
178         }
179
180         s.performTest(testData, c)
181 }
182
183 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
184         testData := PullWorkerTestData{
185                 name:         "TestPullWorker_error_on_get_two_locators",
186                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
187                 responseCode: http.StatusOK,
188                 responseBody: "Received 2 pull requests\n",
189                 readContent:  "unused",
190                 readError:    true,
191                 putError:     false,
192         }
193
194         s.performTest(testData, c)
195 }
196
197 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
198         testData := PullWorkerTestData{
199                 name:         "TestPullWorker_error_on_put_one_locator",
200                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
201                 responseCode: http.StatusOK,
202                 responseBody: "Received 1 pull requests\n",
203                 readContent:  "hello hello",
204                 readError:    false,
205                 putError:     true,
206         }
207
208         s.performTest(testData, c)
209 }
210
211 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
212         testData := PullWorkerTestData{
213                 name:         "TestPullWorker_error_on_put_two_locators",
214                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
215                 responseCode: http.StatusOK,
216                 responseBody: "Received 2 pull requests\n",
217                 readContent:  "hello again",
218                 readError:    false,
219                 putError:     true,
220         }
221
222         s.performTest(testData, c)
223 }
224
225 // In this case, the item will not be placed on pullq
226 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
227         testData := PullWorkerTestData{
228                 name:         "TestPullWorkerPullList_with_two_locators",
229                 req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
230                 responseCode: http.StatusUnauthorized,
231                 responseBody: "Unauthorized\n",
232                 readContent:  "hello",
233                 readError:    false,
234                 putError:     false,
235         }
236
237         s.performTest(testData, c)
238 }
239
240 func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
241         s.testPullLists[testData.name] = testData.responseBody
242
243         processedPullLists := make(map[string]string)
244
245         // Override GetContent to mock keepclient Get functionality
246         defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
247                 GetContent = orig
248         }(GetContent)
249         GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
250                 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
251                 processedPullLists[testData.name] = testData.responseBody
252                 if testData.readError {
253                         err = errors.New("Error getting data")
254                         s.readError = err
255                         return
256                 }
257                 s.readContent = testData.readContent
258                 reader = ioutil.NopCloser(bytes.NewBufferString(testData.readContent))
259                 contentLength = int64(len(testData.readContent))
260                 return
261         }
262
263         // Override writePulledBlock to mock PutBlock functionality
264         defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
265         writePulledBlock = func(v Volume, content []byte, locator string) {
266                 if testData.putError {
267                         s.putError = errors.New("Error putting data")
268                         return
269                 }
270                 s.putContent = content
271         }
272
273         c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
274         c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
275         c.Check(getStatusItem("Version"), Not(Equals), "")
276
277         response := IssueRequest(&testData.req)
278         c.Assert(response.Code, Equals, testData.responseCode)
279         c.Assert(response.Body.String(), Equals, testData.responseBody)
280
281         expectEqualWithin(c, time.Second, 0, func() interface{} {
282                 st := pullq.Status()
283                 return st.InProgress + st.Queued
284         })
285
286         if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
287                 c.Assert(len(s.testPullLists), Equals, 2)
288                 c.Assert(len(processedPullLists), Equals, 1)
289                 c.Assert(s.testPullLists["Added_before_actual_test_item"], NotNil)
290                 c.Assert(s.testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
291                 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
292         } else {
293                 if testData.responseCode == http.StatusOK {
294                         c.Assert(len(s.testPullLists), Equals, 1)
295                         c.Assert(len(processedPullLists), Equals, 1)
296                         c.Assert(s.testPullLists[testData.name], NotNil)
297                 } else {
298                         c.Assert(len(s.testPullLists), Equals, 1)
299                         c.Assert(len(processedPullLists), Equals, 0)
300                 }
301         }
302
303         if testData.readError {
304                 c.Assert(s.readError, NotNil)
305         } else if testData.responseCode == http.StatusOK {
306                 c.Assert(s.readError, IsNil)
307                 c.Assert(s.readContent, Equals, testData.readContent)
308                 if testData.putError {
309                         c.Assert(s.putError, NotNil)
310                 } else {
311                         c.Assert(s.putError, IsNil)
312                         c.Assert(string(s.putContent), Equals, testData.readContent)
313                 }
314         }
315
316         expectChannelEmpty(c, pullq.NextItem)
317 }