7253: update manigest.parseManifestStream to raise error when the given manifest...
[arvados.git] / services / keepstore / pull_worker_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         . "gopkg.in/check.v1"
9         "io"
10         "net/http"
11         "testing"
12         "time"
13 )
14
15 type PullWorkerTestSuite struct{}
16
17 // Gocheck boilerplate
18 func TestPullWorker(t *testing.T) {
19         TestingT(t)
20 }
21
22 // Gocheck boilerplate
23 var _ = Suite(&PullWorkerTestSuite{})
24
25 var testPullLists map[string]string
26 var readContent string
27 var readError error
28 var putContent []byte
29 var putError error
30 var currentTestData PullWorkerTestData
31
32 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
33         readContent = ""
34         readError = nil
35         putContent = []byte("")
36         putError = nil
37
38         // When a new pull request arrives, the old one will be overwritten.
39         // This behavior is verified using these two maps in the
40         // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
41         testPullLists = make(map[string]string)
42 }
43
44 // Since keepstore does not come into picture in tests,
45 // we need to explicitly start the goroutine in tests.
46 func RunTestPullWorker(c *C) {
47         arv, err := arvadosclient.MakeArvadosClient()
48         c.Assert(err, Equals, nil)
49         keepClient, err := keepclient.MakeKeepClient(&arv)
50         c.Assert(err, Equals, nil)
51
52         pullq = NewWorkQueue()
53         go RunPullWorker(pullq, keepClient)
54 }
55
56 var firstPullList = []byte(`[
57                 {
58                         "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
59                         "servers":[
60                                 "server_1",
61                                 "server_2"
62                         ]
63                 },{
64                         "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
65                         "servers":[
66                                 "server_3"
67                         ]
68                 }
69         ]`)
70
71 var secondPullList = []byte(`[
72                 {
73                         "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
74                         "servers":[
75                                 "server_1",
76                                 "server_2"
77                         ]
78                 }
79         ]`)
80
81 type PullWorkerTestData struct {
82         name         string
83         req          RequestTester
84         responseCode int
85         responseBody string
86         readContent  string
87         readError    bool
88         putError     bool
89 }
90
91 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
92         defer teardown()
93
94         dataManagerToken = "DATA MANAGER TOKEN"
95         testData := PullWorkerTestData{
96                 name:         "TestPullWorkerPullList_with_two_locators",
97                 req:          RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
98                 responseCode: http.StatusOK,
99                 responseBody: "Received 2 pull requests\n",
100                 readContent:  "hello",
101                 readError:    false,
102                 putError:     false,
103         }
104
105         performTest(testData, c)
106 }
107
108 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
109         defer teardown()
110
111         dataManagerToken = "DATA MANAGER TOKEN"
112         testData := PullWorkerTestData{
113                 name:         "TestPullWorkerPullList_with_one_locator",
114                 req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
115                 responseCode: http.StatusOK,
116                 responseBody: "Received 1 pull requests\n",
117                 readContent:  "hola",
118                 readError:    false,
119                 putError:     false,
120         }
121
122         performTest(testData, c)
123 }
124
125 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
126         defer teardown()
127
128         dataManagerToken = "DATA MANAGER TOKEN"
129         testData := PullWorkerTestData{
130                 name:         "TestPullWorker_error_on_get_one_locator",
131                 req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
132                 responseCode: http.StatusOK,
133                 responseBody: "Received 1 pull requests\n",
134                 readContent:  "unused",
135                 readError:    true,
136                 putError:     false,
137         }
138
139         performTest(testData, c)
140 }
141
142 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
143         defer teardown()
144
145         dataManagerToken = "DATA MANAGER TOKEN"
146         testData := PullWorkerTestData{
147                 name:         "TestPullWorker_error_on_get_two_locators",
148                 req:          RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
149                 responseCode: http.StatusOK,
150                 responseBody: "Received 2 pull requests\n",
151                 readContent:  "unused",
152                 readError:    true,
153                 putError:     false,
154         }
155
156         performTest(testData, c)
157 }
158
159 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
160         defer teardown()
161
162         dataManagerToken = "DATA MANAGER TOKEN"
163         testData := PullWorkerTestData{
164                 name:         "TestPullWorker_error_on_put_one_locator",
165                 req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
166                 responseCode: http.StatusOK,
167                 responseBody: "Received 1 pull requests\n",
168                 readContent:  "hello hello",
169                 readError:    false,
170                 putError:     true,
171         }
172
173         performTest(testData, c)
174 }
175
176 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
177         defer teardown()
178
179         dataManagerToken = "DATA MANAGER TOKEN"
180         testData := PullWorkerTestData{
181                 name:         "TestPullWorker_error_on_put_two_locators",
182                 req:          RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
183                 responseCode: http.StatusOK,
184                 responseBody: "Received 2 pull requests\n",
185                 readContent:  "hello again",
186                 readError:    false,
187                 putError:     true,
188         }
189
190         performTest(testData, c)
191 }
192
193 // When a new pull request arrives, the old one is replaced. This test
194 // is used to check that behavior by first putting an item on the queue,
195 // and then performing the test. Thus the "testPullLists" has two entries;
196 // however, processedPullLists will see only the newest item in the list.
197 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_replacing_old(c *C) {
198         defer teardown()
199
200         var firstInput = []int{1}
201         pullq = NewWorkQueue()
202         pullq.ReplaceQueue(makeTestWorkList(firstInput))
203         testPullLists["Added_before_actual_test_item"] = string(1)
204
205         dataManagerToken = "DATA MANAGER TOKEN"
206         testData := PullWorkerTestData{
207                 name:         "TestPullWorkerPullList_with_two_items_latest_replacing_old",
208                 req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
209                 responseCode: http.StatusOK,
210                 responseBody: "Received 1 pull requests\n",
211                 readContent:  "hola de nuevo",
212                 readError:    false,
213                 putError:     false,
214         }
215
216         performTest(testData, c)
217 }
218
219 // In this case, the item will not be placed on pullq
220 func (s *PullWorkerTestSuite) TestPullWorker_invalid_dataManagerToken(c *C) {
221         defer teardown()
222
223         dataManagerToken = "DATA MANAGER TOKEN"
224
225         testData := PullWorkerTestData{
226                 name:         "TestPullWorkerPullList_with_two_locators",
227                 req:          RequestTester{"/pull", "invalid_dataManagerToken", "PUT", firstPullList},
228                 responseCode: http.StatusUnauthorized,
229                 responseBody: "Unauthorized\n",
230                 readContent:  "hello",
231                 readError:    false,
232                 putError:     false,
233         }
234
235         performTest(testData, c)
236 }
237
238 func performTest(testData PullWorkerTestData, c *C) {
239         KeepVM = MakeTestVolumeManager(2)
240         defer KeepVM.Close()
241
242         RunTestPullWorker(c)
243         defer pullq.Close()
244
245         currentTestData = testData
246         testPullLists[testData.name] = testData.responseBody
247
248         processedPullLists := make(map[string]string)
249
250         // Override GetContent to mock keepclient Get functionality
251         defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
252                 GetContent = orig
253         }(GetContent)
254         GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
255                 c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
256                 processedPullLists[testData.name] = testData.responseBody
257                 if testData.readError {
258                         err = errors.New("Error getting data")
259                         readError = err
260                         return nil, 0, "", err
261                 }
262                 readContent = testData.readContent
263                 cb := &ClosingBuffer{bytes.NewBufferString(testData.readContent)}
264                 var rc io.ReadCloser
265                 rc = cb
266                 return rc, int64(len(testData.readContent)), "", nil
267         }
268
269         // Override PutContent to mock PutBlock functionality
270         defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
271         PutContent = func(content []byte, locator string) (err error) {
272                 if testData.putError {
273                         err = errors.New("Error putting data")
274                         putError = err
275                         return err
276                 }
277                 putContent = content
278                 return nil
279         }
280
281         c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
282         c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
283
284         response := IssueRequest(&testData.req)
285         c.Assert(response.Code, Equals, testData.responseCode)
286         c.Assert(response.Body.String(), Equals, testData.responseBody)
287
288         expectEqualWithin(c, time.Second, 0, func() interface{} {
289                 st := pullq.Status()
290                 return st.InProgress + st.Queued
291         })
292
293         if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
294                 c.Assert(len(testPullLists), Equals, 2)
295                 c.Assert(len(processedPullLists), Equals, 1)
296                 c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
297                 c.Assert(testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
298                 c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
299         } else {
300                 if testData.responseCode == http.StatusOK {
301                         c.Assert(len(testPullLists), Equals, 1)
302                         c.Assert(len(processedPullLists), Equals, 1)
303                         c.Assert(testPullLists[testData.name], NotNil)
304                 } else {
305                         c.Assert(len(testPullLists), Equals, 1)
306                         c.Assert(len(processedPullLists), Equals, 0)
307                 }
308         }
309
310         if testData.readError {
311                 c.Assert(readError, NotNil)
312         } else if testData.responseCode == http.StatusOK {
313                 c.Assert(readError, IsNil)
314                 c.Assert(readContent, Equals, testData.readContent)
315                 if testData.putError {
316                         c.Assert(putError, NotNil)
317                 } else {
318                         c.Assert(putError, IsNil)
319                         c.Assert(string(putContent), Equals, testData.readContent)
320                 }
321         }
322
323         expectChannelEmpty(c, pullq.NextItem)
324 }
325
326 type ClosingBuffer struct {
327         *bytes.Buffer
328 }
329
330 func (cb *ClosingBuffer) Close() (err error) {
331         return
332 }