Merge branch '21146-pysdk-new-websockets'
[arvados.git] / services / keepstore / pull_worker_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "io"
12         "io/ioutil"
13         "net/http"
14         "time"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/keepclient"
18         "github.com/prometheus/client_golang/prometheus"
19         . "gopkg.in/check.v1"
20         check "gopkg.in/check.v1"
21 )
22
23 var _ = Suite(&PullWorkerTestSuite{})
24
25 type PullWorkerTestSuite struct {
26         cluster *arvados.Cluster
27         handler *handler
28
29         testPullLists map[string]string
30         readContent   string
31         readError     error
32         putContent    []byte
33         putError      error
34 }
35
36 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
37         s.cluster = testCluster(c)
38         s.cluster.Volumes = map[string]arvados.Volume{
39                 "zzzzz-nyw5e-000000000000000": {Driver: "mock"},
40                 "zzzzz-nyw5e-111111111111111": {Driver: "mock"},
41         }
42         s.cluster.Collections.BlobReplicateConcurrency = 1
43
44         s.handler = &handler{}
45         c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
46
47         s.readContent = ""
48         s.readError = nil
49         s.putContent = []byte{}
50         s.putError = nil
51
52         // When a new pull request arrives, the old one will be overwritten.
53         // This behavior is verified using these two maps in the
54         // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
55         s.testPullLists = make(map[string]string)
56 }
57
58 var firstPullList = []byte(`[
59                 {
60                         "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
61                         "servers":[
62                                 "server_1",
63                                 "server_2"
64                         ]
65                 },{
66                         "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
67                         "servers":[
68                                 "server_3"
69                         ]
70                 }
71         ]`)
72
73 var secondPullList = []byte(`[
74                 {
75                         "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
76                         "servers":[
77                                 "server_1",
78                                 "server_2"
79                         ]
80                 }
81         ]`)
82
83 type PullWorkerTestData struct {
84         name         string
85         req          RequestTester
86         responseCode int
87         responseBody string
88         readContent  string
89         readError    bool
90         putError     bool
91 }
92
93 // Ensure MountUUID in a pull list is correctly translated to a Volume
94 // argument passed to writePulledBlock().
95 func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
96         defer func(f func(*RRVolumeManager, Volume, []byte, string) error) {
97                 writePulledBlock = f
98         }(writePulledBlock)
99         pullq := s.handler.Handler.(*router).pullq
100
101         for _, spec := range []struct {
102                 sendUUID     string
103                 expectVolume Volume
104         }{
105                 {
106                         sendUUID:     "",
107                         expectVolume: nil,
108                 },
109                 {
110                         sendUUID:     s.handler.volmgr.Mounts()[0].UUID,
111                         expectVolume: s.handler.volmgr.Mounts()[0].Volume,
112                 },
113         } {
114                 writePulledBlock = func(_ *RRVolumeManager, v Volume, _ []byte, _ string) error {
115                         c.Check(v, Equals, spec.expectVolume)
116                         return nil
117                 }
118
119                 resp := IssueRequest(s.handler, &RequestTester{
120                         uri:      "/pull",
121                         apiToken: s.cluster.SystemRootToken,
122                         method:   "PUT",
123                         requestBody: []byte(`[{
124                                 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
125                                 "servers":["server_1","server_2"],
126                                 "mount_uuid":"` + spec.sendUUID + `"}]`),
127                 })
128                 c.Assert(resp.Code, Equals, http.StatusOK)
129                 expectEqualWithin(c, time.Second, 0, func() interface{} {
130                         st := pullq.Status()
131                         return st.InProgress + st.Queued
132                 })
133         }
134 }
135
136 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
137         testData := PullWorkerTestData{
138                 name:         "TestPullWorkerPullList_with_two_locators",
139                 req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
140                 responseCode: http.StatusOK,
141                 responseBody: "Received 2 pull requests\n",
142                 readContent:  "hello",
143                 readError:    false,
144                 putError:     false,
145         }
146
147         s.performTest(testData, c)
148 }
149
150 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
151         testData := PullWorkerTestData{
152                 name:         "TestPullWorkerPullList_with_one_locator",
153                 req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
154                 responseCode: http.StatusOK,
155                 responseBody: "Received 1 pull requests\n",
156                 readContent:  "hola",
157                 readError:    false,
158                 putError:     false,
159         }
160
161         s.performTest(testData, c)
162 }
163
164 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
165         testData := PullWorkerTestData{
166                 name:         "TestPullWorker_error_on_get_one_locator",
167                 req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
168                 responseCode: http.StatusOK,
169                 responseBody: "Received 1 pull requests\n",
170                 readContent:  "unused",
171                 readError:    true,
172                 putError:     false,
173         }
174
175         s.performTest(testData, c)
176 }
177
178 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
179         testData := PullWorkerTestData{
180                 name:         "TestPullWorker_error_on_get_two_locators",
181                 req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
182                 responseCode: http.StatusOK,
183                 responseBody: "Received 2 pull requests\n",
184                 readContent:  "unused",
185                 readError:    true,
186                 putError:     false,
187         }
188
189         s.performTest(testData, c)
190 }
191
192 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
193         testData := PullWorkerTestData{
194                 name:         "TestPullWorker_error_on_put_one_locator",
195                 req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
196                 responseCode: http.StatusOK,
197                 responseBody: "Received 1 pull requests\n",
198                 readContent:  "hello hello",
199                 readError:    false,
200                 putError:     true,
201         }
202
203         s.performTest(testData, c)
204 }
205
206 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
207         testData := PullWorkerTestData{
208                 name:         "TestPullWorker_error_on_put_two_locators",
209                 req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
210                 responseCode: http.StatusOK,
211                 responseBody: "Received 2 pull requests\n",
212                 readContent:  "hello again",
213                 readError:    false,
214                 putError:     true,
215         }
216
217         s.performTest(testData, c)
218 }
219
220 // In this case, the item will not be placed on pullq
221 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
222         testData := PullWorkerTestData{
223                 name:         "TestPullWorkerPullList_with_two_locators",
224                 req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList, ""},
225                 responseCode: http.StatusUnauthorized,
226                 responseBody: "Unauthorized\n",
227                 readContent:  "hello",
228                 readError:    false,
229                 putError:     false,
230         }
231
232         s.performTest(testData, c)
233 }
234
235 func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
236         pullq := s.handler.Handler.(*router).pullq
237
238         s.testPullLists[testData.name] = testData.responseBody
239
240         processedPullLists := make(map[string]string)
241
242         // Override GetContent to mock keepclient Get functionality
243         defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
244                 GetContent = orig
245         }(GetContent)
246         GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
247                 c.Assert(getStatusItem(s.handler, "PullQueue", "InProgress"), Equals, float64(1))
248                 processedPullLists[testData.name] = testData.responseBody
249                 if testData.readError {
250                         err = errors.New("Error getting data")
251                         s.readError = err
252                         return
253                 }
254                 s.readContent = testData.readContent
255                 reader = ioutil.NopCloser(bytes.NewBufferString(testData.readContent))
256                 contentLength = int64(len(testData.readContent))
257                 return
258         }
259
260         // Override writePulledBlock to mock PutBlock functionality
261         defer func(orig func(*RRVolumeManager, Volume, []byte, string) error) { writePulledBlock = orig }(writePulledBlock)
262         writePulledBlock = func(_ *RRVolumeManager, v Volume, content []byte, locator string) error {
263                 if testData.putError {
264                         s.putError = errors.New("Error putting data")
265                         return s.putError
266                 }
267                 s.putContent = content
268                 return nil
269         }
270
271         c.Check(getStatusItem(s.handler, "PullQueue", "InProgress"), Equals, float64(0))
272         c.Check(getStatusItem(s.handler, "PullQueue", "Queued"), Equals, float64(0))
273         c.Check(getStatusItem(s.handler, "Version"), Not(Equals), "")
274
275         response := IssueRequest(s.handler, &testData.req)
276         c.Assert(response.Code, Equals, testData.responseCode)
277         c.Assert(response.Body.String(), Equals, testData.responseBody)
278
279         expectEqualWithin(c, time.Second, 0, func() interface{} {
280                 st := pullq.Status()
281                 return st.InProgress + st.Queued
282         })
283
284         if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
285                 c.Assert(len(s.testPullLists), Equals, 2)
286                 c.Assert(len(processedPullLists), Equals, 1)
287                 c.Assert(s.testPullLists["Added_before_actual_test_item"], NotNil)
288                 c.Assert(s.testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
289                 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
290         } else {
291                 if testData.responseCode == http.StatusOK {
292                         c.Assert(len(s.testPullLists), Equals, 1)
293                         c.Assert(len(processedPullLists), Equals, 1)
294                         c.Assert(s.testPullLists[testData.name], NotNil)
295                 } else {
296                         c.Assert(len(s.testPullLists), Equals, 1)
297                         c.Assert(len(processedPullLists), Equals, 0)
298                 }
299         }
300
301         if testData.readError {
302                 c.Assert(s.readError, NotNil)
303         } else if testData.responseCode == http.StatusOK {
304                 c.Assert(s.readError, IsNil)
305                 c.Assert(s.readContent, Equals, testData.readContent)
306                 if testData.putError {
307                         c.Assert(s.putError, NotNil)
308                 } else {
309                         c.Assert(s.putError, IsNil)
310                         c.Assert(string(s.putContent), Equals, testData.readContent)
311                 }
312         }
313
314         expectChannelEmpty(c, pullq.NextItem)
315 }