7167: some more error tests such as error getting block from src and error putting...
[arvados.git] / tools / keep-rsync / keep-rsync_test.go
1 package main
2
3 import (
4         "crypto/md5"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "strings"
9         "testing"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
13         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
14         "git.curoverse.com/arvados.git/sdk/go/keepclient"
15
16         . "gopkg.in/check.v1"
17 )
18
19 // Gocheck boilerplate
20 func Test(t *testing.T) {
21         TestingT(t)
22 }
23
24 // Gocheck boilerplate
25 var _ = Suite(&ServerRequiredSuite{})
26
27 // Tests that require the Keep server running
28 type ServerRequiredSuite struct{}
29
30 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
31 }
32
33 func (s *ServerRequiredSuite) SetUpTest(c *C) {
34         arvadostest.ResetEnv()
35
36         // reset all variables between tests
37         srcConfig = arvadosclient.APIConfig{}
38         dstConfig = arvadosclient.APIConfig{}
39         blobSigningKey = ""
40         srcKeepServicesJSON = ""
41         dstKeepServicesJSON = ""
42         replications = 0
43         prefix = ""
44         arvSrc = arvadosclient.ArvadosClient{}
45         arvDst = arvadosclient.ArvadosClient{}
46         kcSrc = &keepclient.KeepClient{}
47         kcDst = &keepclient.KeepClient{}
48 }
49
50 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
51         arvadostest.StopKeep()
52         arvadostest.StopAPI()
53 }
54
55 var testKeepServicesJSON = "{ \"kind\":\"arvados#keepServiceList\", \"etag\":\"\", \"self_link\":\"\", \"offset\":null, \"limit\":null, \"items\":[ { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012340\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012340\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25107, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false }, { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012341\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012341\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25108, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false } ], \"items_available\":2 }"
56
57 // Testing keep-rsync needs two sets of keep services: src and dst.
58 // The test setup hence tweaks keep-rsync initialization to achieve this.
59 // First invoke initializeKeepRsync and then invoke StartKeepWithParams
60 // to create the keep servers to be used as destination.
61 func setupRsync(c *C, enforcePermissions bool, setupDstServers bool) {
62         // srcConfig
63         srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
64         srcConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
65         srcConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
66
67         // dstConfig
68         dstConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
69         dstConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
70         dstConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
71
72         if enforcePermissions {
73                 blobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
74         }
75
76         // Start API and Keep servers
77         arvadostest.StartAPI()
78         arvadostest.StartKeepWithParams(false, enforcePermissions)
79
80         // initialize keep-rsync
81         err := initializeKeepRsync()
82         c.Check(err, IsNil)
83
84         // Create an additional keep server to be used as destination and reload kcDst
85         // Set replications to 1 since those many keep servers were created for dst.
86         if setupDstServers {
87                 arvadostest.StartKeepWithParams(true, enforcePermissions)
88                 replications = 1
89
90                 kcDst, err = keepclient.MakeKeepClient(&arvDst)
91                 c.Check(err, IsNil)
92                 kcDst.Want_replicas = 1
93         }
94 }
95
96 // Test readConfigFromFile method
97 func (s *ServerRequiredSuite) TestReadConfigFromFile(c *C) {
98         // Setup a test config file
99         file, err := ioutil.TempFile(os.TempDir(), "config")
100         c.Check(err, IsNil)
101         defer os.Remove(file.Name())
102
103         fileContent := "ARVADOS_API_HOST=testhost\n"
104         fileContent += "ARVADOS_API_TOKEN=testtoken\n"
105         fileContent += "ARVADOS_API_HOST_INSECURE=true\n"
106         fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg"
107
108         _, err = file.Write([]byte(fileContent))
109
110         // Invoke readConfigFromFile method with this test filename
111         config, err := readConfigFromFile(file.Name())
112         c.Check(err, IsNil)
113         c.Assert(config.APIHost, Equals, "testhost")
114         c.Assert(config.APIToken, Equals, "testtoken")
115         c.Assert(config.APIHostInsecure, Equals, true)
116         c.Assert(config.ExternalClient, Equals, false)
117         c.Assert(blobSigningKey, Equals, "abcdefg")
118 }
119
120 // Test keep-rsync initialization, with src and dst keep servers.
121 // Do a Put and Get in src, both of which should succeed.
122 // Do a Put and Get in dst, both of which should succeed.
123 // Do a Get in dst for the src hash, which should raise block not found error.
124 // Do a Get in src for the dst hash, which should raise block not found error.
125 func (s *ServerRequiredSuite) TestRsyncPutInOne_GetFromOtherShouldFail(c *C) {
126         setupRsync(c, false, true)
127
128         // Put a block in src using kcSrc and Get it
129         srcData := []byte("test-data1")
130         locatorInSrc := fmt.Sprintf("%x", md5.Sum(srcData))
131
132         hash, rep, err := kcSrc.PutB(srcData)
133         c.Check(hash, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, locatorInSrc))
134         c.Check(rep, Equals, 2)
135         c.Check(err, Equals, nil)
136
137         reader, blocklen, _, err := kcSrc.Get(locatorInSrc)
138         c.Check(err, IsNil)
139         c.Check(blocklen, Equals, int64(10))
140         all, err := ioutil.ReadAll(reader)
141         c.Check(all, DeepEquals, srcData)
142
143         // Put a different block in src using kcSrc and Get it
144         dstData := []byte("test-data2")
145         locatorInDst := fmt.Sprintf("%x", md5.Sum(dstData))
146
147         hash, rep, err = kcDst.PutB(dstData)
148         c.Check(hash, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, locatorInDst))
149         c.Check(rep, Equals, 1)
150         c.Check(err, Equals, nil)
151
152         reader, blocklen, _, err = kcDst.Get(locatorInDst)
153         c.Check(err, IsNil)
154         c.Check(blocklen, Equals, int64(10))
155         all, err = ioutil.ReadAll(reader)
156         c.Check(all, DeepEquals, dstData)
157
158         // Get srcLocator using kcDst should fail with Not Found error
159         _, _, _, err = kcDst.Get(locatorInSrc)
160         c.Assert(err.Error(), Equals, "Block not found")
161
162         // Get dstLocator using kcSrc should fail with Not Found error
163         _, _, _, err = kcSrc.Get(locatorInDst)
164         c.Assert(err.Error(), Equals, "Block not found")
165 }
166
167 // Test keep-rsync initialization, with srcKeepServicesJSON
168 func (s *ServerRequiredSuite) TestRsyncInitializeWithKeepServicesJSON(c *C) {
169         srcKeepServicesJSON = testKeepServicesJSON
170
171         setupRsync(c, false, true)
172
173         localRoots := kcSrc.LocalRoots()
174         c.Check(localRoots != nil, Equals, true)
175
176         foundIt := false
177         for k := range localRoots {
178                 if k == "zzzzz-bi6l4-123456789012340" {
179                         foundIt = true
180                 }
181         }
182         c.Check(foundIt, Equals, true)
183
184         foundIt = false
185         for k := range localRoots {
186                 if k == "zzzzz-bi6l4-123456789012341" {
187                         foundIt = true
188                 }
189         }
190         c.Check(foundIt, Equals, true)
191 }
192
193 // Test keep-rsync initialization, with src and dst keep servers with blobSigningKey.
194 // Do a Put and Get in src, both of which should succeed.
195 // Do a Put and Get in dst, both of which should succeed.
196 // Do a Get in dst for the src hash, which should raise block not found error.
197 // Do a Get in src for the dst hash, which should raise block not found error.
198 func (s *ServerRequiredSuite) TestRsyncWithBlobSigning_PutInOne_GetFromOtherShouldFail(c *C) {
199         setupRsync(c, true, true)
200
201         // Put a block in src using kcSrc and Get it
202         srcData := []byte("test-data1")
203         locatorInSrc := fmt.Sprintf("%x", md5.Sum(srcData))
204
205         hash, rep, err := kcSrc.PutB(srcData)
206         c.Check(hash, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, locatorInSrc))
207         c.Check(rep, Equals, 2)
208         c.Check(err, Equals, nil)
209
210         tomorrow := time.Now().AddDate(0, 0, 1)
211         signedLocator := keepclient.SignLocator(locatorInSrc, arvSrc.ApiToken, tomorrow, []byte(blobSigningKey))
212
213         reader, blocklen, _, err := kcSrc.Get(signedLocator)
214         c.Check(err, IsNil)
215         c.Check(blocklen, Equals, int64(10))
216         all, err := ioutil.ReadAll(reader)
217         c.Check(all, DeepEquals, srcData)
218
219         // Put a different block in src using kcSrc and Get it
220         dstData := []byte("test-data2")
221         locatorInDst := fmt.Sprintf("%x", md5.Sum(dstData))
222
223         hash, rep, err = kcDst.PutB(dstData)
224         c.Check(hash, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, locatorInDst))
225         c.Check(rep, Equals, 1)
226         c.Check(err, Equals, nil)
227
228         signedLocator = keepclient.SignLocator(locatorInDst, arvDst.ApiToken, tomorrow, []byte(blobSigningKey))
229
230         reader, blocklen, _, err = kcDst.Get(signedLocator)
231         c.Check(err, IsNil)
232         c.Check(blocklen, Equals, int64(10))
233         all, err = ioutil.ReadAll(reader)
234         c.Check(all, DeepEquals, dstData)
235
236         // Get srcLocator using kcDst should fail with Not Found error
237         signedLocator = keepclient.SignLocator(locatorInSrc, arvDst.ApiToken, tomorrow, []byte(blobSigningKey))
238         _, _, _, err = kcDst.Get(locatorInSrc)
239         c.Assert(err.Error(), Equals, "Block not found")
240
241         // Get dstLocator using kcSrc should fail with Not Found error
242         signedLocator = keepclient.SignLocator(locatorInDst, arvSrc.ApiToken, tomorrow, []byte(blobSigningKey))
243         _, _, _, err = kcSrc.Get(locatorInDst)
244         c.Assert(err.Error(), Equals, "Block not found")
245 }
246
247 // Test keep-rsync initialization with default replications count
248 func (s *ServerRequiredSuite) TestInitializeRsyncDefaultReplicationsCount(c *C) {
249         setupRsync(c, false, false)
250
251         // Must have got default replications value as 2 from dst discovery document
252         c.Assert(replications, Equals, 2)
253 }
254
255 // Test keep-rsync initialization with replications count argument
256 func (s *ServerRequiredSuite) TestInitializeRsyncReplicationsCount(c *C) {
257         // First set replications to 3 to mimic passing input argument
258         replications = 3
259
260         setupRsync(c, false, false)
261
262         // Since replications value is provided, default is not used
263         c.Assert(replications, Equals, 3)
264 }
265
266 // Put some blocks in Src and some more in Dst
267 // And copy missing blocks from Src to Dst
268 func (s *ServerRequiredSuite) TestKeepRsync(c *C) {
269         testKeepRsync(c, false, "")
270 }
271
272 // Put some blocks in Src and some more in Dst with blob signing enabled.
273 // And copy missing blocks from Src to Dst
274 func (s *ServerRequiredSuite) TestKeepRsync_WithBlobSigning(c *C) {
275         testKeepRsync(c, true, "")
276 }
277
278 // Put some blocks in Src and some more in Dst
279 // Use prefix while doing rsync
280 // And copy missing blocks from Src to Dst
281 func (s *ServerRequiredSuite) TestKeepRsync_WithPrefix(c *C) {
282         data := []byte("test-data-4")
283         hash := fmt.Sprintf("%x", md5.Sum(data))
284
285         testKeepRsync(c, false, hash[0:3])
286 }
287
288 // Put some blocks in Src and some more in Dst
289 // Use prefix not in src while doing rsync
290 // And copy missing blocks from Src to Dst
291 func (s *ServerRequiredSuite) TestKeepRsync_WithNoSuchPrefixInSrc(c *C) {
292         testKeepRsync(c, false, "999")
293 }
294
295 // Put 5 blocks in src. Put 2 of those blocks in dst
296 // Hence there are 3 additional blocks in src
297 // Also, put 2 extra blocks in dst; they are hence only in dst
298 // Run rsync and verify that those 7 blocks are now available in dst
299 func testKeepRsync(c *C, enforcePermissions bool, indexPrefix string) {
300         setupRsync(c, enforcePermissions, true)
301
302         prefix = indexPrefix
303
304         // setupTestData
305         setupTestData(c, enforcePermissions, prefix)
306
307         err := performKeepRsync()
308         c.Check(err, IsNil)
309
310         // Now GetIndex from dst and verify that all 5 from src and the 2 extra blocks are found
311         dstIndex, err := getUniqueLocators(kcDst, "")
312         c.Check(err, IsNil)
313
314         if prefix == "" {
315                 for _, locator := range srcLocators {
316                         _, ok := dstIndex[locator]
317                         c.Assert(ok, Equals, true)
318                 }
319         } else {
320                 for _, locator := range srcLocatorsMatchingPrefix {
321                         _, ok := dstIndex[locator]
322                         c.Assert(ok, Equals, true)
323                 }
324         }
325
326         for _, locator := range extraDstLocators {
327                 _, ok := dstIndex[locator]
328                 c.Assert(ok, Equals, true)
329         }
330
331         if prefix == "" {
332                 // all blocks from src and the two extra blocks
333                 c.Assert(len(dstIndex), Equals, len(srcLocators)+len(extraDstLocators))
334         } else {
335                 // one matching prefix, 2 that were initially copied into dst along with src, and the extra blocks
336                 c.Assert(len(dstIndex), Equals, len(srcLocatorsMatchingPrefix)+len(extraDstLocators)+2)
337         }
338 }
339
340 // Setup test data in src and dst.
341 var srcLocators []string
342 var srcLocatorsMatchingPrefix []string
343 var dstLocators []string
344 var extraDstLocators []string
345
346 func setupTestData(c *C, enforcePermissions bool, indexPrefix string) {
347         srcLocators = []string{}
348         srcLocatorsMatchingPrefix = []string{}
349         dstLocators = []string{}
350         extraDstLocators = []string{}
351
352         tomorrow := time.Now().AddDate(0, 0, 1)
353
354         // Put a few blocks in src using kcSrc
355         for i := 0; i < 5; i++ {
356                 data := []byte(fmt.Sprintf("test-data-%d", i))
357                 hash := fmt.Sprintf("%x", md5.Sum(data))
358
359                 hash2, rep, err := kcSrc.PutB(data)
360                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+11(\+.+)?$`, hash))
361                 c.Check(rep, Equals, 2)
362                 c.Check(err, IsNil)
363
364                 getLocator := hash
365                 if enforcePermissions {
366                         getLocator = keepclient.SignLocator(getLocator, arvSrc.ApiToken, tomorrow, []byte(blobSigningKey))
367                 }
368
369                 reader, blocklen, _, err := kcSrc.Get(getLocator)
370                 c.Check(err, IsNil)
371                 c.Check(blocklen, Equals, int64(11))
372                 all, err := ioutil.ReadAll(reader)
373                 c.Check(all, DeepEquals, data)
374
375                 srcLocators = append(srcLocators, fmt.Sprintf("%s+%d", hash, blocklen))
376                 if strings.HasPrefix(hash, indexPrefix) {
377                         srcLocatorsMatchingPrefix = append(srcLocatorsMatchingPrefix, fmt.Sprintf("%s+%d", hash, blocklen))
378                 }
379         }
380
381         // Put first two of those src blocks in dst using kcDst
382         for i := 0; i < 2; i++ {
383                 data := []byte(fmt.Sprintf("test-data-%d", i))
384                 hash := fmt.Sprintf("%x", md5.Sum(data))
385
386                 hash2, rep, err := kcDst.PutB(data)
387                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+11(\+.+)?$`, hash))
388                 c.Check(rep, Equals, 1)
389                 c.Check(err, IsNil)
390
391                 getLocator := hash
392                 if enforcePermissions {
393                         getLocator = keepclient.SignLocator(getLocator, arvDst.ApiToken, tomorrow, []byte(blobSigningKey))
394                 }
395
396                 reader, blocklen, _, err := kcDst.Get(getLocator)
397                 c.Check(err, IsNil)
398                 c.Check(blocklen, Equals, int64(11))
399                 all, err := ioutil.ReadAll(reader)
400                 c.Check(all, DeepEquals, data)
401
402                 dstLocators = append(dstLocators, fmt.Sprintf("%s+%d", hash, blocklen))
403         }
404
405         // Put two more blocks in dst; they are not in src at all
406         for i := 0; i < 2; i++ {
407                 data := []byte(fmt.Sprintf("other-data-%d", i))
408                 hash := fmt.Sprintf("%x", md5.Sum(data))
409
410                 hash2, rep, err := kcDst.PutB(data)
411                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+12(\+.+)?$`, hash))
412                 c.Check(rep, Equals, 1)
413                 c.Check(err, IsNil)
414
415                 getLocator := hash
416                 if enforcePermissions {
417                         getLocator = keepclient.SignLocator(getLocator, arvDst.ApiToken, tomorrow, []byte(blobSigningKey))
418                 }
419
420                 reader, blocklen, _, err := kcDst.Get(getLocator)
421                 c.Check(err, IsNil)
422                 c.Check(blocklen, Equals, int64(12))
423                 all, err := ioutil.ReadAll(reader)
424                 c.Check(all, DeepEquals, data)
425
426                 extraDstLocators = append(extraDstLocators, fmt.Sprintf("%s+%d", hash, blocklen))
427         }
428 }
429
430 // Setup rsync using srcKeepServicesJSON with fake keepservers.
431 // Expect error during performKeepRsync due to unreachable src keepservers.
432 func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeSrcKeepservers(c *C) {
433         srcKeepServicesJSON = testKeepServicesJSON
434
435         setupRsync(c, false, false)
436
437         err := performKeepRsync()
438         c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
439 }
440
441 // Setup rsync using dstKeepServicesJSON with fake keepservers.
442 // Expect error during performKeepRsync due to unreachable dst keepservers.
443 func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeDstKeepservers(c *C) {
444         dstKeepServicesJSON = testKeepServicesJSON
445
446         setupRsync(c, false, false)
447
448         err := performKeepRsync()
449         c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
450 }
451
452 // Test rsync with signature error during Get from src.
453 func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorGettingBlockFromSrc(c *C) {
454         setupRsync(c, true, true)
455
456         // put some blocks in src and dst
457         setupTestData(c, true, "")
458
459         // Change blob signing key to a fake key, so that Get from src fails
460         blobSigningKey = "123456789012345678901234yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
461
462         err := performKeepRsync()
463         c.Check(err.Error(), Equals, "Block not found")
464 }
465
466 // Test rsync with error during Put to src.
467 func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorPuttingBlockInDst(c *C) {
468         setupRsync(c, false, true)
469
470         // put some blocks in src and dst
471         setupTestData(c, true, "")
472
473         // Increase Want_replicas on dst to result in insufficient replicas error during Put
474         kcDst.Want_replicas = 2
475
476         err := performKeepRsync()
477         c.Check(err.Error(), Equals, "Could not write sufficient replicas")
478 }