Merge branch '11590-log-reuse'
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "io"
7         "io/ioutil"
8         "net/http"
9         "time"
10
11         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
12         "git.curoverse.com/arvados.git/sdk/go/keepclient"
13         . "gopkg.in/check.v1"
14 )
15
16 var _ = Suite(&PullWorkerTestSuite{})
17
18 type PullWorkerTestSuite struct {
19         testPullLists map[string]string
20         readContent   string
21         readError     error
22         putContent    []byte
23         putError      error
24 }
25
26 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
27         theConfig.systemAuthToken = "arbitrary data manager token"
28         s.readContent = ""
29         s.readError = nil
30         s.putContent = []byte{}
31         s.putError = nil
32
33         // When a new pull request arrives, the old one will be overwritten.
34         // This behavior is verified using these two maps in the
35         // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
36         s.testPullLists = make(map[string]string)
37
38         KeepVM = MakeTestVolumeManager(2)
39
40         // Normally the pull queue and workers are started by main()
41         // -- tests need to set up their own.
42         arv, err := arvadosclient.MakeArvadosClient()
43         c.Assert(err, IsNil)
44         keepClient, err := keepclient.MakeKeepClient(arv)
45         c.Assert(err, IsNil)
46         pullq = NewWorkQueue()
47         go RunPullWorker(pullq, keepClient)
48 }
49
50 func (s *PullWorkerTestSuite) TearDownTest(c *C) {
51         KeepVM.Close()
52         KeepVM = nil
53         pullq.Close()
54         pullq = nil
55         teardown()
56         theConfig = DefaultConfig()
57         theConfig.Start()
58 }
59
60 var firstPullList = []byte(`[
61                 {
62                         "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
63                         "servers":[
64                                 "server_1",
65                                 "server_2"
66                         ]
67                 },{
68                         "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
69                         "servers":[
70                                 "server_3"
71                         ]
72                 }
73         ]`)
74
75 var secondPullList = []byte(`[
76                 {
77                         "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
78                         "servers":[
79                                 "server_1",
80                                 "server_2"
81                         ]
82                 }
83         ]`)
84
85 type PullWorkerTestData struct {
86         name         string
87         req          RequestTester
88         responseCode int
89         responseBody string
90         readContent  string
91         readError    bool
92         putError     bool
93 }
94
95 // Ensure MountUUID in a pull list is correctly translated to a Volume
96 // argument passed to writePulledBlock().
97 func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
98         defer func(f func(Volume, []byte, string)) {
99                 writePulledBlock = f
100         }(writePulledBlock)
101
102         for _, spec := range []struct {
103                 sendUUID     string
104                 expectVolume Volume
105         }{
106                 {
107                         sendUUID:     "",
108                         expectVolume: nil,
109                 },
110                 {
111                         sendUUID:     KeepVM.Mounts()[0].UUID,
112                         expectVolume: KeepVM.Mounts()[0].volume,
113                 },
114         } {
115                 writePulledBlock = func(v Volume, _ []byte, _ string) {
116                         c.Check(v, Equals, spec.expectVolume)
117                 }
118
119                 resp := IssueRequest(&RequestTester{
120                         uri:      "/pull",
121                         apiToken: theConfig.systemAuthToken,
122                         method:   "PUT",
123                         requestBody: []byte(`[{
124                                 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
125                                 "servers":["server_1","server_2"],
126                                 "mountuuid":"` + spec.sendUUID + `"}]`),
127                 })
128                 c.Assert(resp.Code, Equals, http.StatusOK)
129                 expectEqualWithin(c, time.Second, 0, func() interface{} {
130                         st := pullq.Status()
131                         return st.InProgress + st.Queued
132                 })
133         }
134 }
135
136 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
137         testData := PullWorkerTestData{
138                 name:         "TestPullWorkerPullList_with_two_locators",
139                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
140                 responseCode: http.StatusOK,
141                 responseBody: "Received 2 pull requests\n",
142                 readContent:  "hello",
143                 readError:    false,
144                 putError:     false,
145         }
146
147         s.performTest(testData, c)
148 }
149
150 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
151         testData := PullWorkerTestData{
152                 name:         "TestPullWorkerPullList_with_one_locator",
153                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
154                 responseCode: http.StatusOK,
155                 responseBody: "Received 1 pull requests\n",
156                 readContent:  "hola",
157                 readError:    false,
158                 putError:     false,
159         }
160
161         s.performTest(testData, c)
162 }
163
164 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
165         testData := PullWorkerTestData{
166                 name:         "TestPullWorker_error_on_get_one_locator",
167                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
168                 responseCode: http.StatusOK,
169                 responseBody: "Received 1 pull requests\n",
170                 readContent:  "unused",
171                 readError:    true,
172                 putError:     false,
173         }
174
175         s.performTest(testData, c)
176 }
177
178 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
179         testData := PullWorkerTestData{
180                 name:         "TestPullWorker_error_on_get_two_locators",
181                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
182                 responseCode: http.StatusOK,
183                 responseBody: "Received 2 pull requests\n",
184                 readContent:  "unused",
185                 readError:    true,
186                 putError:     false,
187         }
188
189         s.performTest(testData, c)
190 }
191
192 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
193         testData := PullWorkerTestData{
194                 name:         "TestPullWorker_error_on_put_one_locator",
195                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
196                 responseCode: http.StatusOK,
197                 responseBody: "Received 1 pull requests\n",
198                 readContent:  "hello hello",
199                 readError:    false,
200                 putError:     true,
201         }
202
203         s.performTest(testData, c)
204 }
205
206 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
207         testData := PullWorkerTestData{
208                 name:         "TestPullWorker_error_on_put_two_locators",
209                 req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
210                 responseCode: http.StatusOK,
211                 responseBody: "Received 2 pull requests\n",
212                 readContent:  "hello again",
213                 readError:    false,
214                 putError:     true,
215         }
216
217         s.performTest(testData, c)
218 }
219
220 // In this case, the item will not be placed on pullq
221 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
222         testData := PullWorkerTestData{
223                 name:         "TestPullWorkerPullList_with_two_locators",
224                 req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
225                 responseCode: http.StatusUnauthorized,
226                 responseBody: "Unauthorized\n",
227                 readContent:  "hello",
228                 readError:    false,
229                 putError:     false,
230         }
231
232         s.performTest(testData, c)
233 }
234
235 func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
236         s.testPullLists[testData.name] = testData.responseBody
237
238         processedPullLists := make(map[string]string)
239
240         // Override GetContent to mock keepclient Get functionality
241         defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
242                 GetContent = orig
243         }(GetContent)
244         GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
245                 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
246                 processedPullLists[testData.name] = testData.responseBody
247                 if testData.readError {
248                         err = errors.New("Error getting data")
249                         s.readError = err
250                         return
251                 }
252                 s.readContent = testData.readContent
253                 reader = ioutil.NopCloser(bytes.NewBufferString(testData.readContent))
254                 contentLength = int64(len(testData.readContent))
255                 return
256         }
257
258         // Override writePulledBlock to mock PutBlock functionality
259         defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
260         writePulledBlock = func(v Volume, content []byte, locator string) {
261                 if testData.putError {
262                         s.putError = errors.New("Error putting data")
263                         return
264                 }
265                 s.putContent = content
266         }
267
268         c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
269         c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
270
271         response := IssueRequest(&testData.req)
272         c.Assert(response.Code, Equals, testData.responseCode)
273         c.Assert(response.Body.String(), Equals, testData.responseBody)
274
275         expectEqualWithin(c, time.Second, 0, func() interface{} {
276                 st := pullq.Status()
277                 return st.InProgress + st.Queued
278         })
279
280         if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
281                 c.Assert(len(s.testPullLists), Equals, 2)
282                 c.Assert(len(processedPullLists), Equals, 1)
283                 c.Assert(s.testPullLists["Added_before_actual_test_item"], NotNil)
284                 c.Assert(s.testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
285                 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
286         } else {
287                 if testData.responseCode == http.StatusOK {
288                         c.Assert(len(s.testPullLists), Equals, 1)
289                         c.Assert(len(processedPullLists), Equals, 1)
290                         c.Assert(s.testPullLists[testData.name], NotNil)
291                 } else {
292                         c.Assert(len(s.testPullLists), Equals, 1)
293                         c.Assert(len(processedPullLists), Equals, 0)
294                 }
295         }
296
297         if testData.readError {
298                 c.Assert(s.readError, NotNil)
299         } else if testData.responseCode == http.StatusOK {
300                 c.Assert(s.readError, IsNil)
301                 c.Assert(s.readContent, Equals, testData.readContent)
302                 if testData.putError {
303                         c.Assert(s.putError, NotNil)
304                 } else {
305                         c.Assert(s.putError, IsNil)
306                         c.Assert(string(s.putContent), Equals, testData.readContent)
307                 }
308         }
309
310         expectChannelEmpty(c, pullq.NextItem)
311 }