7167: break load config logic out of main into loadConfig func and add several tests.
[arvados.git] / tools / keep-rsync / keep-rsync_test.go
1 package main
2
3 import (
4         "crypto/md5"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "strings"
9         "testing"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
13         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
14         "git.curoverse.com/arvados.git/sdk/go/keepclient"
15
16         . "gopkg.in/check.v1"
17 )
18
19 // Gocheck boilerplate
20 func Test(t *testing.T) {
21         TestingT(t)
22 }
23
24 // Gocheck boilerplate
25 var _ = Suite(&ServerRequiredSuite{})
26
27 // Tests that require the Keep server running
28 type ServerRequiredSuite struct{}
29
30 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
31 }
32
33 func (s *ServerRequiredSuite) SetUpTest(c *C) {
34         arvadostest.ResetEnv()
35
36         // reset all variables between tests
37         srcConfig = arvadosclient.APIConfig{}
38         dstConfig = arvadosclient.APIConfig{}
39         blobSigningKey = ""
40         srcKeepServicesJSON = ""
41         dstKeepServicesJSON = ""
42         replications = 0
43         prefix = ""
44         srcConfigFile = ""
45         dstConfigFile = ""
46         arvSrc = arvadosclient.ArvadosClient{}
47         arvDst = arvadosclient.ArvadosClient{}
48         kcSrc = &keepclient.KeepClient{}
49         kcDst = &keepclient.KeepClient{}
50 }
51
52 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
53         arvadostest.StopKeep()
54         arvadostest.StopAPI()
55 }
56
57 var testKeepServicesJSON = "{ \"kind\":\"arvados#keepServiceList\", \"etag\":\"\", \"self_link\":\"\", \"offset\":null, \"limit\":null, \"items\":[ { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012340\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012340\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25107, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false }, { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012341\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012341\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25108, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false } ], \"items_available\":2 }"
58
59 // Testing keep-rsync needs two sets of keep services: src and dst.
60 // The test setup hence tweaks keep-rsync initialization to achieve this.
61 // First invoke initializeKeepRsync and then invoke StartKeepWithParams
62 // to create the keep servers to be used as destination.
63 func setupRsync(c *C, enforcePermissions bool, setupDstServers bool) {
64         // srcConfig
65         srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
66         srcConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
67         srcConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
68
69         // dstConfig
70         dstConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
71         dstConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
72         dstConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
73
74         if enforcePermissions {
75                 blobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
76         }
77
78         // Start API and Keep servers
79         arvadostest.StartAPI()
80         arvadostest.StartKeepWithParams(false, enforcePermissions)
81
82         // initialize keep-rsync
83         err := initializeKeepRsync()
84         c.Check(err, IsNil)
85
86         // Create an additional keep server to be used as destination and reload kcDst
87         // Set replications to 1 since those many keep servers were created for dst.
88         if setupDstServers {
89                 arvadostest.StartKeepWithParams(true, enforcePermissions)
90                 replications = 1
91
92                 kcDst, err = keepclient.MakeKeepClient(&arvDst)
93                 c.Check(err, IsNil)
94                 kcDst.Want_replicas = 1
95         }
96 }
97
98 // Test keep-rsync initialization, with src and dst keep servers.
99 // Do a Put and Get in src, both of which should succeed.
100 // Do a Put and Get in dst, both of which should succeed.
101 // Do a Get in dst for the src hash, which should raise block not found error.
102 // Do a Get in src for the dst hash, which should raise block not found error.
103 func (s *ServerRequiredSuite) TestRsyncPutInOne_GetFromOtherShouldFail(c *C) {
104         setupRsync(c, false, true)
105
106         // Put a block in src using kcSrc and Get it
107         srcData := []byte("test-data1")
108         locatorInSrc := fmt.Sprintf("%x", md5.Sum(srcData))
109
110         hash, rep, err := kcSrc.PutB(srcData)
111         c.Check(hash, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, locatorInSrc))
112         c.Check(rep, Equals, 2)
113         c.Check(err, Equals, nil)
114
115         reader, blocklen, _, err := kcSrc.Get(locatorInSrc)
116         c.Check(err, IsNil)
117         c.Check(blocklen, Equals, int64(10))
118         all, err := ioutil.ReadAll(reader)
119         c.Check(all, DeepEquals, srcData)
120
121         // Put a different block in src using kcSrc and Get it
122         dstData := []byte("test-data2")
123         locatorInDst := fmt.Sprintf("%x", md5.Sum(dstData))
124
125         hash, rep, err = kcDst.PutB(dstData)
126         c.Check(hash, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, locatorInDst))
127         c.Check(rep, Equals, 1)
128         c.Check(err, Equals, nil)
129
130         reader, blocklen, _, err = kcDst.Get(locatorInDst)
131         c.Check(err, IsNil)
132         c.Check(blocklen, Equals, int64(10))
133         all, err = ioutil.ReadAll(reader)
134         c.Check(all, DeepEquals, dstData)
135
136         // Get srcLocator using kcDst should fail with Not Found error
137         _, _, _, err = kcDst.Get(locatorInSrc)
138         c.Assert(err.Error(), Equals, "Block not found")
139
140         // Get dstLocator using kcSrc should fail with Not Found error
141         _, _, _, err = kcSrc.Get(locatorInDst)
142         c.Assert(err.Error(), Equals, "Block not found")
143 }
144
145 // Test keep-rsync initialization, with srcKeepServicesJSON
146 func (s *ServerRequiredSuite) TestRsyncInitializeWithKeepServicesJSON(c *C) {
147         srcKeepServicesJSON = testKeepServicesJSON
148
149         setupRsync(c, false, true)
150
151         localRoots := kcSrc.LocalRoots()
152         c.Check(localRoots != nil, Equals, true)
153
154         foundIt := false
155         for k := range localRoots {
156                 if k == "zzzzz-bi6l4-123456789012340" {
157                         foundIt = true
158                 }
159         }
160         c.Check(foundIt, Equals, true)
161
162         foundIt = false
163         for k := range localRoots {
164                 if k == "zzzzz-bi6l4-123456789012341" {
165                         foundIt = true
166                 }
167         }
168         c.Check(foundIt, Equals, true)
169 }
170
171 // Test keep-rsync initialization, with src and dst keep servers with blobSigningKey.
172 // Do a Put and Get in src, both of which should succeed.
173 // Do a Put and Get in dst, both of which should succeed.
174 // Do a Get in dst for the src hash, which should raise block not found error.
175 // Do a Get in src for the dst hash, which should raise block not found error.
176 func (s *ServerRequiredSuite) TestRsyncWithBlobSigning_PutInOne_GetFromOtherShouldFail(c *C) {
177         setupRsync(c, true, true)
178
179         // Put a block in src using kcSrc and Get it
180         srcData := []byte("test-data1")
181         locatorInSrc := fmt.Sprintf("%x", md5.Sum(srcData))
182
183         hash, rep, err := kcSrc.PutB(srcData)
184         c.Check(hash, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, locatorInSrc))
185         c.Check(rep, Equals, 2)
186         c.Check(err, Equals, nil)
187
188         tomorrow := time.Now().AddDate(0, 0, 1)
189         signedLocator := keepclient.SignLocator(locatorInSrc, arvSrc.ApiToken, tomorrow, []byte(blobSigningKey))
190
191         reader, blocklen, _, err := kcSrc.Get(signedLocator)
192         c.Check(err, IsNil)
193         c.Check(blocklen, Equals, int64(10))
194         all, err := ioutil.ReadAll(reader)
195         c.Check(all, DeepEquals, srcData)
196
197         // Put a different block in src using kcSrc and Get it
198         dstData := []byte("test-data2")
199         locatorInDst := fmt.Sprintf("%x", md5.Sum(dstData))
200
201         hash, rep, err = kcDst.PutB(dstData)
202         c.Check(hash, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, locatorInDst))
203         c.Check(rep, Equals, 1)
204         c.Check(err, Equals, nil)
205
206         signedLocator = keepclient.SignLocator(locatorInDst, arvDst.ApiToken, tomorrow, []byte(blobSigningKey))
207
208         reader, blocklen, _, err = kcDst.Get(signedLocator)
209         c.Check(err, IsNil)
210         c.Check(blocklen, Equals, int64(10))
211         all, err = ioutil.ReadAll(reader)
212         c.Check(all, DeepEquals, dstData)
213
214         // Get srcLocator using kcDst should fail with Not Found error
215         signedLocator = keepclient.SignLocator(locatorInSrc, arvDst.ApiToken, tomorrow, []byte(blobSigningKey))
216         _, _, _, err = kcDst.Get(locatorInSrc)
217         c.Assert(err.Error(), Equals, "Block not found")
218
219         // Get dstLocator using kcSrc should fail with Not Found error
220         signedLocator = keepclient.SignLocator(locatorInDst, arvSrc.ApiToken, tomorrow, []byte(blobSigningKey))
221         _, _, _, err = kcSrc.Get(locatorInDst)
222         c.Assert(err.Error(), Equals, "Block not found")
223 }
224
225 // Test keep-rsync initialization with default replications count
226 func (s *ServerRequiredSuite) TestInitializeRsyncDefaultReplicationsCount(c *C) {
227         setupRsync(c, false, false)
228
229         // Must have got default replications value as 2 from dst discovery document
230         c.Assert(replications, Equals, 2)
231 }
232
233 // Test keep-rsync initialization with replications count argument
234 func (s *ServerRequiredSuite) TestInitializeRsyncReplicationsCount(c *C) {
235         // First set replications to 3 to mimic passing input argument
236         replications = 3
237
238         setupRsync(c, false, false)
239
240         // Since replications value is provided, default is not used
241         c.Assert(replications, Equals, 3)
242 }
243
244 // Put some blocks in Src and some more in Dst
245 // And copy missing blocks from Src to Dst
246 func (s *ServerRequiredSuite) TestKeepRsync(c *C) {
247         testKeepRsync(c, false, "")
248 }
249
250 // Put some blocks in Src and some more in Dst with blob signing enabled.
251 // And copy missing blocks from Src to Dst
252 func (s *ServerRequiredSuite) TestKeepRsync_WithBlobSigning(c *C) {
253         testKeepRsync(c, true, "")
254 }
255
256 // Put some blocks in Src and some more in Dst
257 // Use prefix while doing rsync
258 // And copy missing blocks from Src to Dst
259 func (s *ServerRequiredSuite) TestKeepRsync_WithPrefix(c *C) {
260         data := []byte("test-data-4")
261         hash := fmt.Sprintf("%x", md5.Sum(data))
262
263         testKeepRsync(c, false, hash[0:3])
264 }
265
266 // Put some blocks in Src and some more in Dst
267 // Use prefix not in src while doing rsync
268 // And copy missing blocks from Src to Dst
269 func (s *ServerRequiredSuite) TestKeepRsync_WithNoSuchPrefixInSrc(c *C) {
270         testKeepRsync(c, false, "999")
271 }
272
273 // Put 5 blocks in src. Put 2 of those blocks in dst
274 // Hence there are 3 additional blocks in src
275 // Also, put 2 extra blocks in dst; they are hence only in dst
276 // Run rsync and verify that those 7 blocks are now available in dst
277 func testKeepRsync(c *C, enforcePermissions bool, indexPrefix string) {
278         setupRsync(c, enforcePermissions, true)
279
280         prefix = indexPrefix
281
282         // setupTestData
283         setupTestData(c, enforcePermissions, prefix)
284
285         err := performKeepRsync()
286         c.Check(err, IsNil)
287
288         // Now GetIndex from dst and verify that all 5 from src and the 2 extra blocks are found
289         dstIndex, err := getUniqueLocators(kcDst, "")
290         c.Check(err, IsNil)
291
292         if prefix == "" {
293                 for _, locator := range srcLocators {
294                         _, ok := dstIndex[locator]
295                         c.Assert(ok, Equals, true)
296                 }
297         } else {
298                 for _, locator := range srcLocatorsMatchingPrefix {
299                         _, ok := dstIndex[locator]
300                         c.Assert(ok, Equals, true)
301                 }
302         }
303
304         for _, locator := range extraDstLocators {
305                 _, ok := dstIndex[locator]
306                 c.Assert(ok, Equals, true)
307         }
308
309         if prefix == "" {
310                 // all blocks from src and the two extra blocks
311                 c.Assert(len(dstIndex), Equals, len(srcLocators)+len(extraDstLocators))
312         } else {
313                 // one matching prefix, 2 that were initially copied into dst along with src, and the extra blocks
314                 c.Assert(len(dstIndex), Equals, len(srcLocatorsMatchingPrefix)+len(extraDstLocators)+2)
315         }
316 }
317
318 // Setup test data in src and dst.
319 var srcLocators []string
320 var srcLocatorsMatchingPrefix []string
321 var dstLocators []string
322 var extraDstLocators []string
323
324 func setupTestData(c *C, enforcePermissions bool, indexPrefix string) {
325         srcLocators = []string{}
326         srcLocatorsMatchingPrefix = []string{}
327         dstLocators = []string{}
328         extraDstLocators = []string{}
329
330         tomorrow := time.Now().AddDate(0, 0, 1)
331
332         // Put a few blocks in src using kcSrc
333         for i := 0; i < 5; i++ {
334                 data := []byte(fmt.Sprintf("test-data-%d", i))
335                 hash := fmt.Sprintf("%x", md5.Sum(data))
336
337                 hash2, rep, err := kcSrc.PutB(data)
338                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+11(\+.+)?$`, hash))
339                 c.Check(rep, Equals, 2)
340                 c.Check(err, IsNil)
341
342                 getLocator := hash
343                 if enforcePermissions {
344                         getLocator = keepclient.SignLocator(getLocator, arvSrc.ApiToken, tomorrow, []byte(blobSigningKey))
345                 }
346
347                 reader, blocklen, _, err := kcSrc.Get(getLocator)
348                 c.Check(err, IsNil)
349                 c.Check(blocklen, Equals, int64(11))
350                 all, err := ioutil.ReadAll(reader)
351                 c.Check(all, DeepEquals, data)
352
353                 srcLocators = append(srcLocators, fmt.Sprintf("%s+%d", hash, blocklen))
354                 if strings.HasPrefix(hash, indexPrefix) {
355                         srcLocatorsMatchingPrefix = append(srcLocatorsMatchingPrefix, fmt.Sprintf("%s+%d", hash, blocklen))
356                 }
357         }
358
359         // Put first two of those src blocks in dst using kcDst
360         for i := 0; i < 2; i++ {
361                 data := []byte(fmt.Sprintf("test-data-%d", i))
362                 hash := fmt.Sprintf("%x", md5.Sum(data))
363
364                 hash2, rep, err := kcDst.PutB(data)
365                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+11(\+.+)?$`, hash))
366                 c.Check(rep, Equals, 1)
367                 c.Check(err, IsNil)
368
369                 getLocator := hash
370                 if enforcePermissions {
371                         getLocator = keepclient.SignLocator(getLocator, arvDst.ApiToken, tomorrow, []byte(blobSigningKey))
372                 }
373
374                 reader, blocklen, _, err := kcDst.Get(getLocator)
375                 c.Check(err, IsNil)
376                 c.Check(blocklen, Equals, int64(11))
377                 all, err := ioutil.ReadAll(reader)
378                 c.Check(all, DeepEquals, data)
379
380                 dstLocators = append(dstLocators, fmt.Sprintf("%s+%d", hash, blocklen))
381         }
382
383         // Put two more blocks in dst; they are not in src at all
384         for i := 0; i < 2; i++ {
385                 data := []byte(fmt.Sprintf("other-data-%d", i))
386                 hash := fmt.Sprintf("%x", md5.Sum(data))
387
388                 hash2, rep, err := kcDst.PutB(data)
389                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+12(\+.+)?$`, hash))
390                 c.Check(rep, Equals, 1)
391                 c.Check(err, IsNil)
392
393                 getLocator := hash
394                 if enforcePermissions {
395                         getLocator = keepclient.SignLocator(getLocator, arvDst.ApiToken, tomorrow, []byte(blobSigningKey))
396                 }
397
398                 reader, blocklen, _, err := kcDst.Get(getLocator)
399                 c.Check(err, IsNil)
400                 c.Check(blocklen, Equals, int64(12))
401                 all, err := ioutil.ReadAll(reader)
402                 c.Check(all, DeepEquals, data)
403
404                 extraDstLocators = append(extraDstLocators, fmt.Sprintf("%s+%d", hash, blocklen))
405         }
406 }
407
408 // Setup rsync using srcKeepServicesJSON with fake keepservers.
409 // Expect error during performKeepRsync due to unreachable src keepservers.
410 func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeSrcKeepservers(c *C) {
411         srcKeepServicesJSON = testKeepServicesJSON
412
413         setupRsync(c, false, false)
414
415         err := performKeepRsync()
416         c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
417 }
418
419 // Setup rsync using dstKeepServicesJSON with fake keepservers.
420 // Expect error during performKeepRsync due to unreachable dst keepservers.
421 func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeDstKeepservers(c *C) {
422         dstKeepServicesJSON = testKeepServicesJSON
423
424         setupRsync(c, false, false)
425
426         err := performKeepRsync()
427         c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
428 }
429
430 // Test rsync with signature error during Get from src.
431 func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorGettingBlockFromSrc(c *C) {
432         setupRsync(c, true, true)
433
434         // put some blocks in src and dst
435         setupTestData(c, true, "")
436
437         // Change blob signing key to a fake key, so that Get from src fails
438         blobSigningKey = "123456789012345678901234yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
439
440         err := performKeepRsync()
441         c.Check(err.Error(), Equals, "Block not found")
442 }
443
444 // Test rsync with error during Put to src.
445 func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorPuttingBlockInDst(c *C) {
446         setupRsync(c, false, true)
447
448         // put some blocks in src and dst
449         setupTestData(c, true, "")
450
451         // Increase Want_replicas on dst to result in insufficient replicas error during Put
452         kcDst.Want_replicas = 2
453
454         err := performKeepRsync()
455         c.Check(err.Error(), Equals, "Could not write sufficient replicas")
456 }
457
458 // Test loadConfig func
459 func (s *ServerRequiredSuite) TestLoadConfig(c *C) {
460         // Setup a src config file
461         srcFile := setupConfigFile(c, "src-config")
462         defer os.Remove(srcFile.Name())
463         srcConfigFile = srcFile.Name()
464
465         // Setup a dst config file
466         dstFile := setupConfigFile(c, "dst-config")
467         defer os.Remove(dstFile.Name())
468         dstConfigFile = dstFile.Name()
469
470         // load configuration from those files
471         err := loadConfig()
472         c.Check(err, IsNil)
473
474         c.Assert(srcConfig.APIHost, Equals, "testhost")
475         c.Assert(srcConfig.APIToken, Equals, "testtoken")
476         c.Assert(srcConfig.APIHostInsecure, Equals, true)
477         c.Assert(srcConfig.ExternalClient, Equals, false)
478
479         c.Assert(dstConfig.APIHost, Equals, "testhost")
480         c.Assert(dstConfig.APIToken, Equals, "testtoken")
481         c.Assert(dstConfig.APIHostInsecure, Equals, true)
482         c.Assert(dstConfig.ExternalClient, Equals, false)
483
484         c.Assert(blobSigningKey, Equals, "abcdefg")
485 }
486
487 // Test loadConfig func without setting up the config files
488 func (s *ServerRequiredSuite) TestLoadConfig_MissingSrcConfig(c *C) {
489         srcConfigFile = ""
490         err := loadConfig()
491         c.Assert(err.Error(), Equals, "-src-config-file must be specified")
492 }
493
494 // Test loadConfig func - error reading src config
495 func (s *ServerRequiredSuite) TestLoadConfig_ErrorLoadingSrcConfig(c *C) {
496         srcConfigFile = "no-such-config-file"
497
498         // load configuration
499         err := loadConfig()
500         c.Assert(strings.HasSuffix(err.Error(), "no such file or directory"), Equals, true)
501 }
502
503 // Test loadConfig func with no dst config file specified
504 func (s *ServerRequiredSuite) TestLoadConfig_MissingDstConfig(c *C) {
505         // Setup a src config file
506         srcFile := setupConfigFile(c, "src-config")
507         defer os.Remove(srcFile.Name())
508         srcConfigFile = srcFile.Name()
509
510         // But do not setup the dst config file
511         dstConfigFile = ""
512
513         // load configuration
514         err := loadConfig()
515         c.Assert(err.Error(), Equals, "-dst-config-file must be specified")
516 }
517
518 // Test loadConfig func
519 func (s *ServerRequiredSuite) TestLoadConfig_ErrorLoadingDstConfig(c *C) {
520         // Setup a src config file
521         srcFile := setupConfigFile(c, "src-config")
522         defer os.Remove(srcFile.Name())
523         srcConfigFile = srcFile.Name()
524
525         // But do not setup the dst config file
526         dstConfigFile = "no-such-config-file"
527
528         // load configuration
529         err := loadConfig()
530         c.Assert(strings.HasSuffix(err.Error(), "no such file or directory"), Equals, true)
531 }
532
533 func setupConfigFile(c *C, name string) *os.File {
534         // Setup a config file
535         file, err := ioutil.TempFile(os.TempDir(), name)
536         c.Check(err, IsNil)
537
538         fileContent := "ARVADOS_API_HOST=testhost\n"
539         fileContent += "ARVADOS_API_TOKEN=testtoken\n"
540         fileContent += "ARVADOS_API_HOST_INSECURE=true\n"
541         fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg"
542
543         _, err = file.Write([]byte(fileContent))
544         c.Check(err, IsNil)
545
546         return file
547 }