From: radhika Date: Sat, 10 Oct 2015 00:09:17 +0000 (-0400) Subject: 7167: stop rsync operation on any errors during Get or Put operations; add additional... X-Git-Tag: 1.1.0~1310^2~18 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/e1492d82f4326cc58531b92844118b987575846e?ds=sidebyside 7167: stop rsync operation on any errors during Get or Put operations; add additional tests. --- diff --git a/tools/keep-rsync/keep-rsync.go b/tools/keep-rsync/keep-rsync.go index 88caf903b8..226b597e69 100644 --- a/tools/keep-rsync/keep-rsync.go +++ b/tools/keep-rsync/keep-rsync.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "errors" "flag" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/keepclient" @@ -97,7 +98,10 @@ func main() { } // Copy blocks not found in dst from src - performKeepRsync() + err = performKeepRsync() + if err != nil { + log.Fatal("Error while syncing data: %s", err.Error()) + } } var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$") @@ -215,9 +219,9 @@ func performKeepRsync() error { toBeCopied := getMissingLocators(srcIndex, dstIndex) // Copy each missing block to dst - copyBlocksToDst(toBeCopied) + err = copyBlocksToDst(toBeCopied) - return nil + return err } // Get list of unique locators from the specified cluster @@ -268,10 +272,9 @@ func getMissingLocators(srcLocators map[string]bool, dstLocators map[string]bool } // Copy blocks from src to dst; only those that are missing in dst are copied -func copyBlocksToDst(toBeCopied []string) { +func copyBlocksToDst(toBeCopied []string) error { done := 0 total := len(toBeCopied) - var failed []string for _, locator := range toBeCopied { log.Printf("Getting block %d of %d", done+1, total) @@ -287,33 +290,29 @@ func copyBlocksToDst(toBeCopied []string) { reader, _, _, err := kcSrc.Get(getLocator) if err != nil { log.Printf("Error getting block: %q %v", locator, err) - failed = append(failed, locator) - continue + return err } data, err := ioutil.ReadAll(reader) if err != nil { log.Printf("Error reading block data: %q %v", locator, err) - failed = append(failed, locator) - continue + return err } log.Printf("Copying block: %q", locator) _, rep, err := kcDst.PutB(data) if err != nil { log.Printf("Error putting block data: %q %v", locator, err) - failed = append(failed, locator) - continue + return err } if rep != replications { log.Printf("Failed to put enough number of replicas. Wanted: %d; Put: %d", replications, rep) - failed = append(failed, locator) - continue + return errors.New("Failed to put enough number of replicas") } done++ log.Printf("%.2f%% done", float64(done)/float64(total)*100) } - log.Printf("Successfully copied to destination %d and failed %d out of a total of %d", done, len(failed), total) - log.Printf("Failed blocks %v", failed) + log.Printf("Successfully copied to destination %d blocks.", total) + return nil } diff --git a/tools/keep-rsync/keep-rsync_test.go b/tools/keep-rsync/keep-rsync_test.go index 4561f5d3b5..26e87f088c 100644 --- a/tools/keep-rsync/keep-rsync_test.go +++ b/tools/keep-rsync/keep-rsync_test.go @@ -33,6 +33,9 @@ func (s *ServerRequiredSuite) SetUpTest(c *C) { arvadostest.ResetEnv() srcKeepServicesJSON = "" dstKeepServicesJSON = "" + replications = 0 + prefix = "" + blobSigningKey = "" } func (s *ServerRequiredSuite) TearDownSuite(c *C) { @@ -40,11 +43,13 @@ func (s *ServerRequiredSuite) TearDownSuite(c *C) { arvadostest.StopAPI() } +var testKeepServicesJSON = "{ \"kind\":\"arvados#keepServiceList\", \"etag\":\"\", \"self_link\":\"\", \"offset\":null, \"limit\":null, \"items\":[ { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012340\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012340\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25107, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false }, { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012341\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012341\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25108, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false } ], \"items_available\":2 }" + // Testing keep-rsync needs two sets of keep services: src and dst. // The test setup hence tweaks keep-rsync initialization to achieve this. // First invoke initializeKeepRsync and then invoke StartKeepWithParams // to create the keep servers to be used as destination. -func setupRsync(c *C, enforcePermissions bool, overwriteReplications bool) { +func setupRsync(c *C, enforcePermissions bool, overwrite bool) { // srcConfig srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST") srcConfig.APIToken = os.Getenv("ARVADOS_API_TOKEN") @@ -65,27 +70,25 @@ func setupRsync(c *C, enforcePermissions bool, overwriteReplications bool) { // initialize keep-rsync err := initializeKeepRsync() - c.Assert(err, Equals, nil) - - // Create two more keep servers to be used as destination - arvadostest.StartKeepWithParams(true, enforcePermissions) + c.Check(err, IsNil) - // set replications to 1 since those many keep servers were created for dst. - if overwriteReplications { + // Create an additional keep server to be used as destination and reload kcDst + // Set replications to 1 since those many keep servers were created for dst. + if overwrite { + arvadostest.StartKeepWithParams(true, enforcePermissions) replications = 1 - } - // load kcDst - kcDst, err = keepclient.MakeKeepClient(&arvDst) - c.Assert(err, Equals, nil) - kcDst.Want_replicas = 1 + kcDst, err = keepclient.MakeKeepClient(&arvDst) + c.Check(err, IsNil) + kcDst.Want_replicas = 1 + } } // Test readConfigFromFile method func (s *ServerRequiredSuite) TestReadConfigFromFile(c *C) { // Setup a test config file file, err := ioutil.TempFile(os.TempDir(), "config") - c.Assert(err, Equals, nil) + c.Check(err, IsNil) defer os.Remove(file.Name()) fileContent := "ARVADOS_API_HOST=testhost\n" @@ -97,7 +100,7 @@ func (s *ServerRequiredSuite) TestReadConfigFromFile(c *C) { // Invoke readConfigFromFile method with this test filename config, err := readConfigFromFile(file.Name()) - c.Assert(err, Equals, nil) + c.Check(err, IsNil) c.Assert(config.APIHost, Equals, "testhost") c.Assert(config.APIToken, Equals, "testtoken") c.Assert(config.APIHostInsecure, Equals, true) @@ -123,7 +126,7 @@ func (s *ServerRequiredSuite) TestRsyncPutInOne_GetFromOtherShouldFail(c *C) { c.Check(err, Equals, nil) reader, blocklen, _, err := kcSrc.Get(locatorInSrc) - c.Assert(err, Equals, nil) + c.Check(err, IsNil) c.Check(blocklen, Equals, int64(10)) all, err := ioutil.ReadAll(reader) c.Check(all, DeepEquals, srcData) @@ -138,7 +141,7 @@ func (s *ServerRequiredSuite) TestRsyncPutInOne_GetFromOtherShouldFail(c *C) { c.Check(err, Equals, nil) reader, blocklen, _, err = kcDst.Get(locatorInDst) - c.Assert(err, Equals, nil) + c.Check(err, IsNil) c.Check(blocklen, Equals, int64(10)) all, err = ioutil.ReadAll(reader) c.Check(all, DeepEquals, dstData) @@ -154,7 +157,7 @@ func (s *ServerRequiredSuite) TestRsyncPutInOne_GetFromOtherShouldFail(c *C) { // Test keep-rsync initialization, with srcKeepServicesJSON func (s *ServerRequiredSuite) TestRsyncInitializeWithKeepServicesJSON(c *C) { - srcKeepServicesJSON = "{ \"kind\":\"arvados#keepServiceList\", \"etag\":\"\", \"self_link\":\"\", \"offset\":null, \"limit\":null, \"items\":[ { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012340\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012340\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25107, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false }, { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012341\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012341\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25108, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false } ], \"items_available\":2 }" + srcKeepServicesJSON = testKeepServicesJSON setupRsync(c, false, true) @@ -199,7 +202,7 @@ func (s *ServerRequiredSuite) TestRsyncWithBlobSigning_PutInOne_GetFromOtherShou signedLocator := keepclient.SignLocator(locatorInSrc, arvSrc.ApiToken, tomorrow, []byte(blobSigningKey)) reader, blocklen, _, err := kcSrc.Get(signedLocator) - c.Assert(err, Equals, nil) + c.Check(err, IsNil) c.Check(blocklen, Equals, int64(10)) all, err := ioutil.ReadAll(reader) c.Check(all, DeepEquals, srcData) @@ -216,7 +219,7 @@ func (s *ServerRequiredSuite) TestRsyncWithBlobSigning_PutInOne_GetFromOtherShou signedLocator = keepclient.SignLocator(locatorInDst, arvDst.ApiToken, tomorrow, []byte(blobSigningKey)) reader, blocklen, _, err = kcDst.Get(signedLocator) - c.Assert(err, Equals, nil) + c.Check(err, IsNil) c.Check(blocklen, Equals, int64(10)) all, err = ioutil.ReadAll(reader) c.Check(all, DeepEquals, dstData) @@ -301,7 +304,7 @@ func testKeepRsync(c *C, enforcePermissions bool, indexPrefix string) { hash2, rep, err := kcSrc.PutB(data) c.Check(hash2, Matches, fmt.Sprintf(`^%s\+11(\+.+)?$`, hash)) c.Check(rep, Equals, 2) - c.Check(err, Equals, nil) + c.Check(err, IsNil) getLocator := hash if enforcePermissions { @@ -309,7 +312,7 @@ func testKeepRsync(c *C, enforcePermissions bool, indexPrefix string) { } reader, blocklen, _, err := kcSrc.Get(getLocator) - c.Assert(err, Equals, nil) + c.Check(err, IsNil) c.Check(blocklen, Equals, int64(11)) all, err := ioutil.ReadAll(reader) c.Check(all, DeepEquals, data) @@ -329,7 +332,7 @@ func testKeepRsync(c *C, enforcePermissions bool, indexPrefix string) { hash2, rep, err := kcDst.PutB(data) c.Check(hash2, Matches, fmt.Sprintf(`^%s\+11(\+.+)?$`, hash)) c.Check(rep, Equals, 1) - c.Check(err, Equals, nil) + c.Check(err, IsNil) getLocator := hash if enforcePermissions { @@ -337,7 +340,7 @@ func testKeepRsync(c *C, enforcePermissions bool, indexPrefix string) { } reader, blocklen, _, err := kcDst.Get(getLocator) - c.Assert(err, Equals, nil) + c.Check(err, IsNil) c.Check(blocklen, Equals, int64(11)) all, err := ioutil.ReadAll(reader) c.Check(all, DeepEquals, data) @@ -354,7 +357,7 @@ func testKeepRsync(c *C, enforcePermissions bool, indexPrefix string) { hash2, rep, err := kcDst.PutB(data) c.Check(hash2, Matches, fmt.Sprintf(`^%s\+12(\+.+)?$`, hash)) c.Check(rep, Equals, 1) - c.Check(err, Equals, nil) + c.Check(err, IsNil) getLocator := hash if enforcePermissions { @@ -362,7 +365,7 @@ func testKeepRsync(c *C, enforcePermissions bool, indexPrefix string) { } reader, blocklen, _, err := kcDst.Get(getLocator) - c.Assert(err, Equals, nil) + c.Check(err, IsNil) c.Check(blocklen, Equals, int64(12)) all, err := ioutil.ReadAll(reader) c.Check(all, DeepEquals, data) @@ -371,11 +374,11 @@ func testKeepRsync(c *C, enforcePermissions bool, indexPrefix string) { } err := performKeepRsync() - c.Check(err, Equals, nil) + c.Check(err, IsNil) // Now GetIndex from dst and verify that all 5 from src and the 2 extra blocks are found dstIndex, err := getUniqueLocators(kcDst, "") - c.Check(err, Equals, nil) + c.Check(err, IsNil) if prefix == "" { for _, locator := range srcLocators { @@ -402,3 +405,25 @@ func testKeepRsync(c *C, enforcePermissions bool, indexPrefix string) { c.Assert(len(dstIndex), Equals, len(srcLocatorsMatchingPrefix)+len(extraDstLocators)+2) } } + +// Setup rsync using srcKeepServicesJSON with fake keepservers. +// Expect error during performKeepRsync due to unreachable src keepservers. +func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeSrcKeepservers(c *C) { + srcKeepServicesJSON = testKeepServicesJSON + + setupRsync(c, false, true) + + err := performKeepRsync() + c.Check(err, NotNil) +} + +// Setup rsync using dstKeepServicesJSON with fake keepservers. +// Expect error during performKeepRsync due to unreachable dst keepservers. +func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeDstKeepservers(c *C) { + dstKeepServicesJSON = testKeepServicesJSON + + setupRsync(c, false, false) + + err := performKeepRsync() + c.Check(err, NotNil) +}