7167: get index from src and dst and copy any missing blocks from src to dst.
authorradhika <radhika@curoverse.com>
Tue, 6 Oct 2015 17:54:41 +0000 (13:54 -0400)
committerradhika <radhika@curoverse.com>
Tue, 6 Oct 2015 17:54:41 +0000 (13:54 -0400)
tools/keep-rsync/keep-rsync.go
tools/keep-rsync/keep-rsync_test.go

index 3761cc7e9c9d154b4a5d061c5e98c85b2b0260e1..c06fb8a303e3d5ead97d77a12ddeafdf24979a94 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "bytes"
        "flag"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -69,6 +70,7 @@ func main() {
 
        var err error
 
+       // Load config
        if srcConfigFile == "" {
                log.Fatal("-src-config-file must be specified.")
        }
@@ -85,10 +87,14 @@ func main() {
                log.Fatal("Error reading destination configuration: %s", err.Error())
        }
 
+       // Initialize keep-rsync
        err = initializeKeepRsync()
        if err != nil {
                log.Fatal("Error configurating keep-rsync: %s", err.Error())
        }
+
+       // Copy blocks not found in dst from src
+       performKeepRsync()
 }
 
 // Reads config from file
@@ -157,6 +163,123 @@ func initializeKeepRsync() (err error) {
                        return
                }
        }
+       kcDst.Want_replicas = replications
 
        return
 }
+
+// Get unique block locators from src and dst
+// Copy any blocks missing in dst
+func performKeepRsync() error {
+       // Get unique locators from src
+       srcIndex, err := getUniqueLocators(kcSrc, prefix)
+       if err != nil {
+               return err
+       }
+
+       // Get unique locators from dst
+       dstIndex, err := getUniqueLocators(kcDst, prefix)
+       if err != nil {
+               return err
+       }
+
+       // Get list of locators found in src, but missing in dst
+       toBeCopied := getMissingLocators(srcIndex, dstIndex)
+
+       // Copy each missing block to dst
+       copyBlocksToDst(toBeCopied)
+
+       return nil
+}
+
+// Get list of unique locators from the specified cluster
+func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
+       var indexBytes []byte
+
+       for uuid := range kc.LocalRoots() {
+               reader, err := kc.GetIndex(uuid, prefix)
+               if err != nil {
+                       return nil, err
+               }
+
+               var readBytes []byte
+               readBytes, err = ioutil.ReadAll(reader)
+               if err != nil {
+                       return nil, err
+               }
+
+               indexBytes = append(indexBytes, readBytes...)
+       }
+
+       // Got index; Now dedup it
+       locators := bytes.Split(indexBytes, []byte("\n"))
+
+       uniqueLocators := map[string]bool{}
+       for _, loc := range locators {
+               if len(loc) == 0 {
+                       continue
+               }
+
+               locator := string(bytes.Split(loc, []byte(" "))[0])
+               if _, ok := uniqueLocators[locator]; !ok {
+                       uniqueLocators[locator] = true
+               }
+       }
+       return uniqueLocators, nil
+}
+
+// Get list of locators that are in src but not in dst
+func getMissingLocators(srcLocators map[string]bool, dstLocators map[string]bool) []string {
+       var missingLocators []string
+       for locator := range srcLocators {
+               if _, ok := dstLocators[locator]; !ok {
+                       missingLocators = append(missingLocators, locator)
+               }
+       }
+       return missingLocators
+}
+
+// Copy blocks from src to dst; only those that are missing in dst are copied
+func copyBlocksToDst(toBeCopied []string) {
+       done := 0
+       total := len(toBeCopied)
+       var failed []string
+
+       for _, locator := range toBeCopied {
+               log.Printf("Getting block %d of %d", done+1, total)
+
+               log.Printf("Getting block: %v", locator)
+
+               reader, _, _, err := kcSrc.Get(locator)
+               if err != nil {
+                       log.Printf("Error getting block: %q %v", locator, err)
+                       failed = append(failed, locator)
+                       continue
+               }
+               data, err := ioutil.ReadAll(reader)
+               if err != nil {
+                       log.Printf("Error reading block data: %q %v", locator, err)
+                       failed = append(failed, locator)
+                       continue
+               }
+
+               log.Printf("Copying block: %q", locator)
+               _, rep, err := kcDst.PutB(data)
+               if err != nil {
+                       log.Printf("Error putting block data: %q %v", locator, err)
+                       failed = append(failed, locator)
+                       continue
+               }
+               if rep != replications {
+                       log.Printf("Failed to put enough number of replicas. Wanted: %d; Put: %d", replications, rep)
+                       failed = append(failed, locator)
+                       continue
+               }
+
+               done++
+               log.Printf("%.2f%% done", float64(done)/float64(total)*100)
+       }
+
+       log.Printf("Successfully copied to destination %d and failed %d out of a total of %d", done, len(failed), total)
+       log.Printf("Failed blocks %v", failed)
+}
index 97db5717d0dbb6d7f0cff8369a3cb8d22a45d669..37e4b20f406122f4d5616051b4306a3da1b95d47 100644 (file)
@@ -55,6 +55,9 @@ func setupRsync(c *C) {
        dstConfig["ARVADOS_API_TOKEN"] = os.Getenv("ARVADOS_API_TOKEN")
        dstConfig["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE")
 
+       replications = 1
+
+       // Start API and Keep servers
        arvadostest.StartAPI()
        arvadostest.StartKeep()
 
@@ -150,7 +153,7 @@ func (s *ServerRequiredSuite) TestRsyncInitializeWithKeepServicesJSON(c *C) {
        c.Check(localRoots != nil, Equals, true)
 
        foundIt := false
-       for k, _ := range localRoots {
+       for k := range localRoots {
                if k == "zzzzz-bi6l4-123456789012340" {
                        foundIt = true
                }
@@ -158,10 +161,93 @@ func (s *ServerRequiredSuite) TestRsyncInitializeWithKeepServicesJSON(c *C) {
        c.Check(foundIt, Equals, true)
 
        foundIt = false
-       for k, _ := range localRoots {
+       for k := range localRoots {
                if k == "zzzzz-bi6l4-123456789012341" {
                        foundIt = true
                }
        }
        c.Check(foundIt, Equals, true)
 }
+
+// Put 5 blocks in src. Put 2 of those blocks in dst
+// Hence there are 3 additional blocks in src
+// Also, put 2 extra blocks in dts; they are hence only in dst
+// Run rsync and verify that those 7 blocks are now available in dst
+func (s *ServerRequiredSuite) TestKeepRsync(c *C) {
+       setupRsync(c)
+
+       // Put a few blocks in src using kcSrc
+       var srcLocators []string
+       for i := 0; i < 5; i++ {
+               data := []byte(fmt.Sprintf("test-data-%d", i))
+               hash := fmt.Sprintf("%x", md5.Sum(data))
+
+               hash2, rep, err := kcSrc.PutB(data)
+               c.Check(hash2, Matches, fmt.Sprintf(`^%s\+11(\+.+)?$`, hash))
+               c.Check(rep, Equals, 2)
+               c.Check(err, Equals, nil)
+
+               reader, blocklen, _, err := kcSrc.Get(hash)
+               c.Assert(err, Equals, nil)
+               c.Check(blocklen, Equals, int64(11))
+               all, err := ioutil.ReadAll(reader)
+               c.Check(all, DeepEquals, data)
+
+               srcLocators = append(srcLocators, fmt.Sprintf("%s+%d", hash, blocklen))
+       }
+
+       // Put just two of those blocks in dst using kcDst
+       var dstLocators []string
+       for i := 0; i < 2; i++ {
+               data := []byte(fmt.Sprintf("test-data-%d", i))
+               hash := fmt.Sprintf("%x", md5.Sum(data))
+
+               hash2, rep, err := kcDst.PutB(data)
+               c.Check(hash2, Matches, fmt.Sprintf(`^%s\+11(\+.+)?$`, hash))
+               c.Check(rep, Equals, 1)
+               c.Check(err, Equals, nil)
+
+               reader, blocklen, _, err := kcDst.Get(hash)
+               c.Assert(err, Equals, nil)
+               c.Check(blocklen, Equals, int64(11))
+               all, err := ioutil.ReadAll(reader)
+               c.Check(all, DeepEquals, data)
+
+               dstLocators = append(dstLocators, fmt.Sprintf("%s+%d", hash, blocklen))
+       }
+
+       // Put two more blocks in dst; they are not in src at all
+       var extraDstLocators []string
+       for i := 0; i < 2; i++ {
+               data := []byte(fmt.Sprintf("other-data-%d", i))
+               hash := fmt.Sprintf("%x", md5.Sum(data))
+
+               hash2, rep, err := kcDst.PutB(data)
+               c.Check(hash2, Matches, fmt.Sprintf(`^%s\+12(\+.+)?$`, hash))
+               c.Check(rep, Equals, 1)
+               c.Check(err, Equals, nil)
+
+               reader, blocklen, _, err := kcDst.Get(hash)
+               c.Assert(err, Equals, nil)
+               c.Check(blocklen, Equals, int64(12))
+               all, err := ioutil.ReadAll(reader)
+               c.Check(all, DeepEquals, data)
+
+               extraDstLocators = append(extraDstLocators, fmt.Sprintf("%s+%d", hash, blocklen))
+       }
+
+       err := performKeepRsync()
+       c.Check(err, Equals, nil)
+
+       // Now GetIndex from dst and verify that all 5 from src and the 2 extra blocks are found
+       dstIndex, err := getUniqueLocators(kcDst, "")
+       c.Check(err, Equals, nil)
+       for _, locator := range srcLocators {
+               _, ok := dstIndex[locator]
+               c.Assert(ok, Equals, true)
+       }
+       for _, locator := range extraDstLocators {
+               _, ok := dstIndex[locator]
+               c.Assert(ok, Equals, true)
+       }
+}