Merge branch '7167-keep-rsync-test-setup' into 7167-keep-rsync
[arvados.git] / tools / keep-rsync / keep-rsync_test.go
1 package main
2
3 import (
4         "crypto/md5"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "strings"
9         "testing"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
13         "git.curoverse.com/arvados.git/sdk/go/keepclient"
14
15         . "gopkg.in/check.v1"
16 )
17
18 // Gocheck boilerplate
19 func Test(t *testing.T) {
20         TestingT(t)
21 }
22
23 // Gocheck boilerplate
24 var _ = Suite(&ServerRequiredSuite{})
25
26 // Tests that require the Keep server running
27 type ServerRequiredSuite struct{}
28
29 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
30 }
31
32 func (s *ServerRequiredSuite) SetUpTest(c *C) {
33         arvadostest.ResetEnv()
34         srcKeepServicesJSON = ""
35         dstKeepServicesJSON = ""
36 }
37
38 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
39         arvadostest.StopKeep()
40         arvadostest.StopAPI()
41 }
42
43 // Testing keep-rsync needs two sets of keep services: src and dst.
44 // The test setup hence tweaks keep-rsync initialization to achieve this.
45 // First invoke initializeKeepRsync and then invoke StartKeepWithParams
46 // to create the keep servers to be used as destination.
47 func setupRsync(c *C, enforcePermissions bool, overwriteReplications bool) {
48         // srcConfig
49         srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
50         srcConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
51         srcConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
52
53         // dstConfig
54         dstConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
55         dstConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN")
56         dstConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
57
58         if enforcePermissions {
59                 blobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
60         }
61
62         // Start API and Keep servers
63         arvadostest.StartAPI()
64         arvadostest.StartKeepWithParams(false, enforcePermissions)
65
66         // initialize keep-rsync
67         err := initializeKeepRsync()
68         c.Assert(err, Equals, nil)
69
70         // Create two more keep servers to be used as destination
71         arvadostest.StartKeepWithParams(true, enforcePermissions)
72
73         // set replications to 1 since those many keep servers were created for dst.
74         if overwriteReplications {
75                 replications = 1
76         }
77
78         // load kcDst
79         kcDst, err = keepclient.MakeKeepClient(&arvDst)
80         c.Assert(err, Equals, nil)
81         kcDst.Want_replicas = 1
82 }
83
84 // Test readConfigFromFile method
85 func (s *ServerRequiredSuite) TestReadConfigFromFile(c *C) {
86         // Setup a test config file
87         file, err := ioutil.TempFile(os.TempDir(), "config")
88         c.Assert(err, Equals, nil)
89         defer os.Remove(file.Name())
90
91         fileContent := "ARVADOS_API_HOST=testhost\n"
92         fileContent += "ARVADOS_API_TOKEN=testtoken\n"
93         fileContent += "ARVADOS_API_HOST_INSECURE=true\n"
94         fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg"
95
96         _, err = file.Write([]byte(fileContent))
97
98         // Invoke readConfigFromFile method with this test filename
99         config, err := readConfigFromFile(file.Name())
100         c.Assert(err, Equals, nil)
101         c.Assert(config.APIHost, Equals, "testhost")
102         c.Assert(config.APIToken, Equals, "testtoken")
103         c.Assert(config.APIHostInsecure, Equals, true)
104         c.Assert(config.ExternalClient, Equals, false)
105         c.Assert(blobSigningKey, Equals, "abcdefg")
106 }
107
108 // Test keep-rsync initialization, with src and dst keep servers.
109 // Do a Put and Get in src, both of which should succeed.
110 // Do a Put and Get in dst, both of which should succeed.
111 // Do a Get in dst for the src hash, which should raise block not found error.
112 // Do a Get in src for the dst hash, which should raise block not found error.
113 func (s *ServerRequiredSuite) TestRsyncPutInOne_GetFromOtherShouldFail(c *C) {
114         setupRsync(c, false, true)
115
116         // Put a block in src using kcSrc and Get it
117         srcData := []byte("test-data1")
118         locatorInSrc := fmt.Sprintf("%x", md5.Sum(srcData))
119
120         hash, rep, err := kcSrc.PutB(srcData)
121         c.Check(hash, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, locatorInSrc))
122         c.Check(rep, Equals, 2)
123         c.Check(err, Equals, nil)
124
125         reader, blocklen, _, err := kcSrc.Get(locatorInSrc)
126         c.Assert(err, Equals, nil)
127         c.Check(blocklen, Equals, int64(10))
128         all, err := ioutil.ReadAll(reader)
129         c.Check(all, DeepEquals, srcData)
130
131         // Put a different block in src using kcSrc and Get it
132         dstData := []byte("test-data2")
133         locatorInDst := fmt.Sprintf("%x", md5.Sum(dstData))
134
135         hash, rep, err = kcDst.PutB(dstData)
136         c.Check(hash, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, locatorInDst))
137         c.Check(rep, Equals, 1)
138         c.Check(err, Equals, nil)
139
140         reader, blocklen, _, err = kcDst.Get(locatorInDst)
141         c.Assert(err, Equals, nil)
142         c.Check(blocklen, Equals, int64(10))
143         all, err = ioutil.ReadAll(reader)
144         c.Check(all, DeepEquals, dstData)
145
146         // Get srcLocator using kcDst should fail with Not Found error
147         _, _, _, err = kcDst.Get(locatorInSrc)
148         c.Assert(err.Error(), Equals, "Block not found")
149
150         // Get dstLocator using kcSrc should fail with Not Found error
151         _, _, _, err = kcSrc.Get(locatorInDst)
152         c.Assert(err.Error(), Equals, "Block not found")
153 }
154
155 // Test keep-rsync initialization, with srcKeepServicesJSON
156 func (s *ServerRequiredSuite) TestRsyncInitializeWithKeepServicesJSON(c *C) {
157         srcKeepServicesJSON = "{ \"kind\":\"arvados#keepServiceList\", \"etag\":\"\", \"self_link\":\"\", \"offset\":null, \"limit\":null, \"items\":[ { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012340\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012340\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25107, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false }, { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012341\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012341\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25108, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false } ], \"items_available\":2 }"
158
159         setupRsync(c, false, true)
160
161         localRoots := kcSrc.LocalRoots()
162         c.Check(localRoots != nil, Equals, true)
163
164         foundIt := false
165         for k := range localRoots {
166                 if k == "zzzzz-bi6l4-123456789012340" {
167                         foundIt = true
168                 }
169         }
170         c.Check(foundIt, Equals, true)
171
172         foundIt = false
173         for k := range localRoots {
174                 if k == "zzzzz-bi6l4-123456789012341" {
175                         foundIt = true
176                 }
177         }
178         c.Check(foundIt, Equals, true)
179 }
180
181 // Test keep-rsync initialization, with src and dst keep servers with blobSigningKey.
182 // Do a Put and Get in src, both of which should succeed.
183 // Do a Put and Get in dst, both of which should succeed.
184 // Do a Get in dst for the src hash, which should raise block not found error.
185 // Do a Get in src for the dst hash, which should raise block not found error.
186 func (s *ServerRequiredSuite) TestRsyncWithBlobSigning_PutInOne_GetFromOtherShouldFail(c *C) {
187         setupRsync(c, true, true)
188
189         // Put a block in src using kcSrc and Get it
190         srcData := []byte("test-data1")
191         locatorInSrc := fmt.Sprintf("%x", md5.Sum(srcData))
192
193         hash, rep, err := kcSrc.PutB(srcData)
194         c.Check(hash, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, locatorInSrc))
195         c.Check(rep, Equals, 2)
196         c.Check(err, Equals, nil)
197
198         tomorrow := time.Now().AddDate(0, 0, 1)
199         signedLocator := keepclient.SignLocator(locatorInSrc, arvSrc.ApiToken, tomorrow, []byte(blobSigningKey))
200
201         reader, blocklen, _, err := kcSrc.Get(signedLocator)
202         c.Assert(err, Equals, nil)
203         c.Check(blocklen, Equals, int64(10))
204         all, err := ioutil.ReadAll(reader)
205         c.Check(all, DeepEquals, srcData)
206
207         // Put a different block in src using kcSrc and Get it
208         dstData := []byte("test-data2")
209         locatorInDst := fmt.Sprintf("%x", md5.Sum(dstData))
210
211         hash, rep, err = kcDst.PutB(dstData)
212         c.Check(hash, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, locatorInDst))
213         c.Check(rep, Equals, 1)
214         c.Check(err, Equals, nil)
215
216         signedLocator = keepclient.SignLocator(locatorInDst, arvDst.ApiToken, tomorrow, []byte(blobSigningKey))
217
218         reader, blocklen, _, err = kcDst.Get(signedLocator)
219         c.Assert(err, Equals, nil)
220         c.Check(blocklen, Equals, int64(10))
221         all, err = ioutil.ReadAll(reader)
222         c.Check(all, DeepEquals, dstData)
223
224         // Get srcLocator using kcDst should fail with Not Found error
225         signedLocator = keepclient.SignLocator(locatorInSrc, arvDst.ApiToken, tomorrow, []byte(blobSigningKey))
226         _, _, _, err = kcDst.Get(locatorInSrc)
227         c.Assert(err.Error(), Equals, "Block not found")
228
229         // Get dstLocator using kcSrc should fail with Not Found error
230         signedLocator = keepclient.SignLocator(locatorInDst, arvSrc.ApiToken, tomorrow, []byte(blobSigningKey))
231         _, _, _, err = kcSrc.Get(locatorInDst)
232         c.Assert(err.Error(), Equals, "Block not found")
233 }
234
235 // Test keep-rsync initialization with default replications count
236 func (s *ServerRequiredSuite) TestInitializeRsyncDefaultReplicationsCount(c *C) {
237         setupRsync(c, false, false)
238
239         // Must have got default replications value as 2 from dst discovery document
240         c.Assert(replications, Equals, 2)
241 }
242
243 // Test keep-rsync initialization with replications count argument
244 func (s *ServerRequiredSuite) TestInitializeRsyncReplicationsCount(c *C) {
245         // First set replications to 3 to mimic passing input argument
246         replications = 3
247
248         setupRsync(c, false, false)
249
250         // Since replications value is provided, default is not used
251         c.Assert(replications, Equals, 3)
252 }
253
254 // Put some blocks in Src and some more in Dst
255 // And copy missing blocks from Src to Dst
256 func (s *ServerRequiredSuite) TestKeepRsync(c *C) {
257         testKeepRsync(c, false, "")
258 }
259
260 // Put some blocks in Src and some more in Dst with blob signing enabled.
261 // And copy missing blocks from Src to Dst
262 func (s *ServerRequiredSuite) TestKeepRsync_WithBlobSigning(c *C) {
263         testKeepRsync(c, true, "")
264 }
265
266 // Put some blocks in Src and some more in Dst
267 // Use prefix while doing rsync
268 // And copy missing blocks from Src to Dst
269 func (s *ServerRequiredSuite) TestKeepRsync_WithPrefix(c *C) {
270         data := []byte("test-data-4")
271         hash := fmt.Sprintf("%x", md5.Sum(data))
272
273         testKeepRsync(c, false, hash[0:3])
274 }
275
276 // Put some blocks in Src and some more in Dst
277 // Use prefix not in src while doing rsync
278 // And copy missing blocks from Src to Dst
279 func (s *ServerRequiredSuite) TestKeepRsync_WithNoSuchPrefixInSrc(c *C) {
280         testKeepRsync(c, false, "999")
281 }
282
283 // Put 5 blocks in src. Put 2 of those blocks in dst
284 // Hence there are 3 additional blocks in src
285 // Also, put 2 extra blocks in dst; they are hence only in dst
286 // Run rsync and verify that those 7 blocks are now available in dst
287 func testKeepRsync(c *C, enforcePermissions bool, indexPrefix string) {
288         setupRsync(c, enforcePermissions, true)
289
290         prefix = indexPrefix
291
292         tomorrow := time.Now().AddDate(0, 0, 1)
293
294         // Put a few blocks in src using kcSrc
295         var srcLocators []string
296         var srcLocatorsMatchingPrefix []string
297         for i := 0; i < 5; i++ {
298                 data := []byte(fmt.Sprintf("test-data-%d", i))
299                 hash := fmt.Sprintf("%x", md5.Sum(data))
300
301                 hash2, rep, err := kcSrc.PutB(data)
302                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+11(\+.+)?$`, hash))
303                 c.Check(rep, Equals, 2)
304                 c.Check(err, Equals, nil)
305
306                 getLocator := hash
307                 if enforcePermissions {
308                         getLocator = keepclient.SignLocator(getLocator, arvSrc.ApiToken, tomorrow, []byte(blobSigningKey))
309                 }
310
311                 reader, blocklen, _, err := kcSrc.Get(getLocator)
312                 c.Assert(err, Equals, nil)
313                 c.Check(blocklen, Equals, int64(11))
314                 all, err := ioutil.ReadAll(reader)
315                 c.Check(all, DeepEquals, data)
316
317                 srcLocators = append(srcLocators, fmt.Sprintf("%s+%d", hash, blocklen))
318                 if strings.HasPrefix(hash, indexPrefix) {
319                         srcLocatorsMatchingPrefix = append(srcLocatorsMatchingPrefix, fmt.Sprintf("%s+%d", hash, blocklen))
320                 }
321         }
322
323         // Put first two of those src blocks in dst using kcDst
324         var dstLocators []string
325         for i := 0; i < 2; i++ {
326                 data := []byte(fmt.Sprintf("test-data-%d", i))
327                 hash := fmt.Sprintf("%x", md5.Sum(data))
328
329                 hash2, rep, err := kcDst.PutB(data)
330                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+11(\+.+)?$`, hash))
331                 c.Check(rep, Equals, 1)
332                 c.Check(err, Equals, nil)
333
334                 getLocator := hash
335                 if enforcePermissions {
336                         getLocator = keepclient.SignLocator(getLocator, arvDst.ApiToken, tomorrow, []byte(blobSigningKey))
337                 }
338
339                 reader, blocklen, _, err := kcDst.Get(getLocator)
340                 c.Assert(err, Equals, nil)
341                 c.Check(blocklen, Equals, int64(11))
342                 all, err := ioutil.ReadAll(reader)
343                 c.Check(all, DeepEquals, data)
344
345                 dstLocators = append(dstLocators, fmt.Sprintf("%s+%d", hash, blocklen))
346         }
347
348         // Put two more blocks in dst; they are not in src at all
349         var extraDstLocators []string
350         for i := 0; i < 2; i++ {
351                 data := []byte(fmt.Sprintf("other-data-%d", i))
352                 hash := fmt.Sprintf("%x", md5.Sum(data))
353
354                 hash2, rep, err := kcDst.PutB(data)
355                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+12(\+.+)?$`, hash))
356                 c.Check(rep, Equals, 1)
357                 c.Check(err, Equals, nil)
358
359                 getLocator := hash
360                 if enforcePermissions {
361                         getLocator = keepclient.SignLocator(getLocator, arvDst.ApiToken, tomorrow, []byte(blobSigningKey))
362                 }
363
364                 reader, blocklen, _, err := kcDst.Get(getLocator)
365                 c.Assert(err, Equals, nil)
366                 c.Check(blocklen, Equals, int64(12))
367                 all, err := ioutil.ReadAll(reader)
368                 c.Check(all, DeepEquals, data)
369
370                 extraDstLocators = append(extraDstLocators, fmt.Sprintf("%s+%d", hash, blocklen))
371         }
372
373         err := performKeepRsync()
374         c.Check(err, Equals, nil)
375
376         // Now GetIndex from dst and verify that all 5 from src and the 2 extra blocks are found
377         dstIndex, err := getUniqueLocators(kcDst, "")
378         c.Check(err, Equals, nil)
379
380         if prefix == "" {
381                 for _, locator := range srcLocators {
382                         _, ok := dstIndex[locator]
383                         c.Assert(ok, Equals, true)
384                 }
385         } else {
386                 for _, locator := range srcLocatorsMatchingPrefix {
387                         _, ok := dstIndex[locator]
388                         c.Assert(ok, Equals, true)
389                 }
390         }
391
392         for _, locator := range extraDstLocators {
393                 _, ok := dstIndex[locator]
394                 c.Assert(ok, Equals, true)
395         }
396
397         if prefix == "" {
398                 // all blocks from src and the two extra blocks
399                 c.Assert(len(dstIndex), Equals, len(srcLocators)+len(extraDstLocators))
400         } else {
401                 // one matching prefix, 2 that were initially copied into dst along with src, and the extra blocks
402                 c.Assert(len(dstIndex), Equals, len(srcLocatorsMatchingPrefix)+len(extraDstLocators)+2)
403         }
404 }