From bd1580a11a377270a5a7eed5abc8a6dfe6f9547d Mon Sep 17 00:00:00 2001 From: radhika Date: Tue, 6 Oct 2015 13:54:41 -0400 Subject: [PATCH] 7167: get index from src and dst and copy any missing blocks from src to dst. --- tools/keep-rsync/keep-rsync.go | 123 ++++++++++++++++++++++++++++ tools/keep-rsync/keep-rsync_test.go | 90 +++++++++++++++++++- 2 files changed, 211 insertions(+), 2 deletions(-) diff --git a/tools/keep-rsync/keep-rsync.go b/tools/keep-rsync/keep-rsync.go index 3761cc7e9c..c06fb8a303 100644 --- a/tools/keep-rsync/keep-rsync.go +++ b/tools/keep-rsync/keep-rsync.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "flag" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/keepclient" @@ -69,6 +70,7 @@ func main() { var err error + // Load config if srcConfigFile == "" { log.Fatal("-src-config-file must be specified.") } @@ -85,10 +87,14 @@ func main() { log.Fatal("Error reading destination configuration: %s", err.Error()) } + // Initialize keep-rsync err = initializeKeepRsync() if err != nil { log.Fatal("Error configurating keep-rsync: %s", err.Error()) } + + // Copy blocks not found in dst from src + performKeepRsync() } // Reads config from file @@ -157,6 +163,123 @@ func initializeKeepRsync() (err error) { return } } + kcDst.Want_replicas = replications return } + +// Get unique block locators from src and dst +// Copy any blocks missing in dst +func performKeepRsync() error { + // Get unique locators from src + srcIndex, err := getUniqueLocators(kcSrc, prefix) + if err != nil { + return err + } + + // Get unique locators from dst + dstIndex, err := getUniqueLocators(kcDst, prefix) + if err != nil { + return err + } + + // Get list of locators found in src, but missing in dst + toBeCopied := getMissingLocators(srcIndex, dstIndex) + + // Copy each missing block to dst + copyBlocksToDst(toBeCopied) + + return nil +} + +// Get list of unique locators from the specified cluster +func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) { + var indexBytes []byte + + for uuid := range kc.LocalRoots() { + reader, err := kc.GetIndex(uuid, prefix) + if err != nil { + return nil, err + } + + var readBytes []byte + readBytes, err = ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + + indexBytes = append(indexBytes, readBytes...) + } + + // Got index; Now dedup it + locators := bytes.Split(indexBytes, []byte("\n")) + + uniqueLocators := map[string]bool{} + for _, loc := range locators { + if len(loc) == 0 { + continue + } + + locator := string(bytes.Split(loc, []byte(" "))[0]) + if _, ok := uniqueLocators[locator]; !ok { + uniqueLocators[locator] = true + } + } + return uniqueLocators, nil +} + +// Get list of locators that are in src but not in dst +func getMissingLocators(srcLocators map[string]bool, dstLocators map[string]bool) []string { + var missingLocators []string + for locator := range srcLocators { + if _, ok := dstLocators[locator]; !ok { + missingLocators = append(missingLocators, locator) + } + } + return missingLocators +} + +// Copy blocks from src to dst; only those that are missing in dst are copied +func copyBlocksToDst(toBeCopied []string) { + done := 0 + total := len(toBeCopied) + var failed []string + + for _, locator := range toBeCopied { + log.Printf("Getting block %d of %d", done+1, total) + + log.Printf("Getting block: %v", locator) + + reader, _, _, err := kcSrc.Get(locator) + if err != nil { + log.Printf("Error getting block: %q %v", locator, err) + failed = append(failed, locator) + continue + } + data, err := ioutil.ReadAll(reader) + if err != nil { + log.Printf("Error reading block data: %q %v", locator, err) + failed = append(failed, locator) + continue + } + + log.Printf("Copying block: %q", locator) + _, rep, err := kcDst.PutB(data) + if err != nil { + log.Printf("Error putting block data: %q %v", locator, err) + failed = append(failed, locator) + continue + } + if rep != replications { + log.Printf("Failed to put enough number of replicas. Wanted: %d; Put: %d", replications, rep) + failed = append(failed, locator) + continue + } + + done++ + log.Printf("%.2f%% done", float64(done)/float64(total)*100) + } + + log.Printf("Successfully copied to destination %d and failed %d out of a total of %d", done, len(failed), total) + log.Printf("Failed blocks %v", failed) +} diff --git a/tools/keep-rsync/keep-rsync_test.go b/tools/keep-rsync/keep-rsync_test.go index 97db5717d0..37e4b20f40 100644 --- a/tools/keep-rsync/keep-rsync_test.go +++ b/tools/keep-rsync/keep-rsync_test.go @@ -55,6 +55,9 @@ func setupRsync(c *C) { dstConfig["ARVADOS_API_TOKEN"] = os.Getenv("ARVADOS_API_TOKEN") dstConfig["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE") + replications = 1 + + // Start API and Keep servers arvadostest.StartAPI() arvadostest.StartKeep() @@ -150,7 +153,7 @@ func (s *ServerRequiredSuite) TestRsyncInitializeWithKeepServicesJSON(c *C) { c.Check(localRoots != nil, Equals, true) foundIt := false - for k, _ := range localRoots { + for k := range localRoots { if k == "zzzzz-bi6l4-123456789012340" { foundIt = true } @@ -158,10 +161,93 @@ func (s *ServerRequiredSuite) TestRsyncInitializeWithKeepServicesJSON(c *C) { c.Check(foundIt, Equals, true) foundIt = false - for k, _ := range localRoots { + for k := range localRoots { if k == "zzzzz-bi6l4-123456789012341" { foundIt = true } } c.Check(foundIt, Equals, true) } + +// Put 5 blocks in src. Put 2 of those blocks in dst +// Hence there are 3 additional blocks in src +// Also, put 2 extra blocks in dts; they are hence only in dst +// Run rsync and verify that those 7 blocks are now available in dst +func (s *ServerRequiredSuite) TestKeepRsync(c *C) { + setupRsync(c) + + // Put a few blocks in src using kcSrc + var srcLocators []string + for i := 0; i < 5; i++ { + data := []byte(fmt.Sprintf("test-data-%d", i)) + hash := fmt.Sprintf("%x", md5.Sum(data)) + + hash2, rep, err := kcSrc.PutB(data) + c.Check(hash2, Matches, fmt.Sprintf(`^%s\+11(\+.+)?$`, hash)) + c.Check(rep, Equals, 2) + c.Check(err, Equals, nil) + + reader, blocklen, _, err := kcSrc.Get(hash) + c.Assert(err, Equals, nil) + c.Check(blocklen, Equals, int64(11)) + all, err := ioutil.ReadAll(reader) + c.Check(all, DeepEquals, data) + + srcLocators = append(srcLocators, fmt.Sprintf("%s+%d", hash, blocklen)) + } + + // Put just two of those blocks in dst using kcDst + var dstLocators []string + for i := 0; i < 2; i++ { + data := []byte(fmt.Sprintf("test-data-%d", i)) + hash := fmt.Sprintf("%x", md5.Sum(data)) + + hash2, rep, err := kcDst.PutB(data) + c.Check(hash2, Matches, fmt.Sprintf(`^%s\+11(\+.+)?$`, hash)) + c.Check(rep, Equals, 1) + c.Check(err, Equals, nil) + + reader, blocklen, _, err := kcDst.Get(hash) + c.Assert(err, Equals, nil) + c.Check(blocklen, Equals, int64(11)) + all, err := ioutil.ReadAll(reader) + c.Check(all, DeepEquals, data) + + dstLocators = append(dstLocators, fmt.Sprintf("%s+%d", hash, blocklen)) + } + + // Put two more blocks in dst; they are not in src at all + var extraDstLocators []string + for i := 0; i < 2; i++ { + data := []byte(fmt.Sprintf("other-data-%d", i)) + hash := fmt.Sprintf("%x", md5.Sum(data)) + + hash2, rep, err := kcDst.PutB(data) + c.Check(hash2, Matches, fmt.Sprintf(`^%s\+12(\+.+)?$`, hash)) + c.Check(rep, Equals, 1) + c.Check(err, Equals, nil) + + reader, blocklen, _, err := kcDst.Get(hash) + c.Assert(err, Equals, nil) + c.Check(blocklen, Equals, int64(12)) + all, err := ioutil.ReadAll(reader) + c.Check(all, DeepEquals, data) + + extraDstLocators = append(extraDstLocators, fmt.Sprintf("%s+%d", hash, blocklen)) + } + + err := performKeepRsync() + c.Check(err, Equals, nil) + + // Now GetIndex from dst and verify that all 5 from src and the 2 extra blocks are found + dstIndex, err := getUniqueLocators(kcDst, "") + c.Check(err, Equals, nil) + for _, locator := range srcLocators { + _, ok := dstIndex[locator] + c.Assert(ok, Equals, true) + } + for _, locator := range extraDstLocators { + _, ok := dstIndex[locator] + c.Assert(ok, Equals, true) + } +} -- 2.30.2