From 10cdbe63e7b41ee773261c904e2913d2b7bad034 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Sun, 23 Jun 2024 08:52:34 -0400 Subject: [PATCH] 21891: Skip extra serialization step. In copy-by-reference, copy everything to one collfs, instead of copying each subcollection to its own temporary collfs, concatenating the manifests, and then loading that into a new collfs. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/crunchrun/copier.go | 72 ++++++++++++++++-------------------- lib/crunchrun/copier_test.go | 30 ++++++++++----- 2 files changed, 52 insertions(+), 50 deletions(-) diff --git a/lib/crunchrun/copier.go b/lib/crunchrun/copier.go index afdd686919..b26562a72c 100644 --- a/lib/crunchrun/copier.go +++ b/lib/crunchrun/copier.go @@ -41,8 +41,8 @@ type filetodo struct { // copied from the local filesystem. // // Symlinks to mounted collections, and any collections mounted under -// ctrOutputDir, are copied by transforming the relevant parts of the -// existing manifests, without moving any data around. +// ctrOutputDir, are copied by reference, without moving any data +// around. // // Symlinks to other parts of the container's filesystem result in // errors. @@ -61,37 +61,40 @@ type copier struct { secretMounts map[string]arvados.Mount logger printfer - dirs []string - files []filetodo - manifest string + dirs []string + files []filetodo + staged arvados.CollectionFileSystem manifestCache map[string]string } // Copy copies data as needed, and returns a new manifest. +// +// Copy should not be called more than once. func (cp *copier) Copy() (string, error) { - err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true) + var err error + cp.staged, err = (&arvados.Collection{}).FileSystem(cp.client, cp.keepClient) if err != nil { - return "", fmt.Errorf("error scanning files to copy to output: %v", err) + return "", fmt.Errorf("error creating Collection.FileSystem: %v", err) } - collfs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient) + err = cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true) if err != nil { - return "", fmt.Errorf("error creating Collection.FileSystem: %v", err) + return "", fmt.Errorf("error scanning files to copy to output: %v", err) } - // Remove files/dirs that don't match globs (the ones that - // were added during cp.walkMount() by copying subtree - // manifests into cp.manifest). - err = cp.applyGlobsToCollectionFS(collfs) + // Remove files/dirs that don't match globs (the files/dirs + // that were added during cp.walkMount() by copying subtree + // manifests into cp.staged). + err = cp.applyGlobsToStaged() if err != nil { return "", fmt.Errorf("error while removing non-matching files from output collection: %w", err) } - // Remove files/dirs that don't match globs (the ones that are - // stored on the local filesystem and would need to be copied - // in copyFile() below). + // Remove files/dirs that don't match globs (the files/dirs + // that are stored on the local filesystem and would need to + // be copied in copyFile() below). cp.applyGlobsToFilesAndDirs() for _, d := range cp.dirs { - err = collfs.Mkdir(d, 0777) + err = cp.staged.Mkdir(d, 0777) if err != nil && err != os.ErrExist { return "", fmt.Errorf("error making directory %q in output collection: %v", d, err) } @@ -106,20 +109,20 @@ func (cp *copier) Copy() (string, error) { // open so f's data can be packed with it). dir, _ := filepath.Split(f.dst) if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE { - if err := collfs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil { + if err := cp.staged.Flush("/"+lastparentdir, dir != lastparentdir); err != nil { return "", fmt.Errorf("error flushing output collection file data: %v", err) } unflushed = 0 } lastparentdir = dir - n, err := cp.copyFile(collfs, f) + n, err := cp.copyFile(cp.staged, f) if err != nil { return "", fmt.Errorf("error copying file %q into output collection: %v", f, err) } unflushed += n } - return collfs.MarshalManifest(".") + return cp.staged.MarshalManifest(".") } func (cp *copier) matchGlobs(path string, isDir bool) bool { @@ -210,15 +213,15 @@ func (cp *copier) applyGlobsToFilesAndDirs() { cp.files = keepfiles } -// Delete files in collfs that do not match cp.globs. Also delete +// Delete files in cp.staged that do not match cp.globs. Also delete // directories that are empty (after deleting non-matching files) and // do not match cp.globs themselves. -func (cp *copier) applyGlobsToCollectionFS(collfs arvados.CollectionFileSystem) error { +func (cp *copier) applyGlobsToStaged() error { if len(cp.globs) == 0 { return nil } include := make(map[string]bool) - err := fs.WalkDir(arvados.FS(collfs), "", func(path string, ent fs.DirEntry, err error) error { + err := fs.WalkDir(arvados.FS(cp.staged), "", func(path string, ent fs.DirEntry, err error) error { if cp.matchGlobs(path, ent.IsDir()) { for i, c := range path { if i > 0 && c == '/' { @@ -232,12 +235,12 @@ func (cp *copier) applyGlobsToCollectionFS(collfs arvados.CollectionFileSystem) if err != nil { return err } - err = fs.WalkDir(arvados.FS(collfs), "", func(path string, ent fs.DirEntry, err error) error { + err = fs.WalkDir(arvados.FS(cp.staged), "", func(path string, ent fs.DirEntry, err error) error { if err != nil || path == "" { return err } if !include[path] { - err := collfs.RemoveAll(path) + err := cp.staged.RemoveAll(path) if err != nil { return err } @@ -306,7 +309,7 @@ func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, return n, dst.Close() } -// Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an +// Add to cp.staged, cp.files, and cp.dirs so as to copy src (an // absolute path in the container's filesystem) to dest (an absolute // path in the output collection, or "" for output root). // @@ -409,29 +412,16 @@ func (cp *copier) copyFromCollection(dest string, coll *arvados.Collection, srcR if err != nil { return err } - tmpfs, err = (&arvados.Collection{}).FileSystem(cp.client, cp.keepClient) - if err != nil { - return err - } // Create ancestors of dest, if necessary. for i, c := range dest { if i > 0 && c == '/' { - err = tmpfs.Mkdir(dest[:i], 0777) + err = cp.staged.Mkdir(dest[:i], 0777) if err != nil && !os.IsExist(err) { return err } } } - err = arvados.Splice(tmpfs, dest, snap) - if err != nil { - return err - } - mtxt, err := tmpfs.MarshalManifest(".") - if err != nil { - return err - } - cp.manifest += mtxt - return nil + return arvados.Splice(cp.staged, dest, snap) } func (cp *copier) walkMountsBelow(dest, src string) error { diff --git a/lib/crunchrun/copier_test.go b/lib/crunchrun/copier_test.go index 6baa4da7cb..3348a879a7 100644 --- a/lib/crunchrun/copier_test.go +++ b/lib/crunchrun/copier_test.go @@ -37,6 +37,8 @@ func (s *copierSuite) SetUpTest(c *check.C) { c.Assert(err, check.IsNil) kc, err := keepclient.MakeKeepClient(cl) c.Assert(err, check.IsNil) + collfs, err := (&arvados.Collection{}).FileSystem(arvados.NewClientFromEnv(), kc) + c.Assert(err, check.IsNil) s.cp = copier{ client: arvados.NewClientFromEnv(), @@ -50,6 +52,7 @@ func (s *copierSuite) SetUpTest(c *check.C) { "/secret_text": {Kind: "text", Content: "xyzzy"}, }, logger: &logrus.Logger{Out: &s.log, Formatter: &logrus.TextFormatter{}, Level: logrus.InfoLevel}, + staged: collfs, } } @@ -148,7 +151,16 @@ func (s *copierSuite) TestSymlinkToMountedCollection(c *check.C) { err = s.cp.walkMount("", s.cp.ctrOutputDir, 10, true) c.Check(err, check.IsNil) - c.Check(s.cp.manifest, check.Matches, `(?ms)\./l_dir acbd\S+ 0:3:foo\n\. acbd\S+ 0:3:l_file\n\. 37b5\S+ 0:3:l_file_w\n`) + s.checkStagedFile(c, "l_dir/foo", 3) + s.checkStagedFile(c, "l_file", 3) + s.checkStagedFile(c, "l_file_w", 3) +} + +func (s *copierSuite) checkStagedFile(c *check.C, path string, size int64) { + fi, err := s.cp.staged.Stat(path) + if c.Check(err, check.IsNil) { + c.Check(fi.Size(), check.Equals, size) + } } func (s *copierSuite) TestSymlink(c *check.C) { @@ -336,12 +348,10 @@ func (s *copierSuite) testCopyFromLargeCollection(c *check.C, writable bool) { c.Log(s.log.String()) // Check some files to ensure they were copied properly. - fs, err := (&arvados.Collection{ManifestText: s.cp.manifest}).FileSystem(s.cp.client, s.cp.keepClient) - c.Assert(err, check.IsNil) for i := 0; i < 100; i += 13 { for j := 0; j < 100; j += 17 { fnm := fmt.Sprintf("/fakecollection/dir%d/dir%d/file%d", i, j, j) - _, err := fs.Stat(fnm) + _, err := s.cp.staged.Stat(fnm) c.Assert(err, check.IsNil, check.Commentf("%s", fnm)) } } @@ -405,8 +415,10 @@ func (s *copierSuite) TestMountBelowExcludedByGlob(c *check.C) { c.Check(s.cp.files, check.DeepEquals, []filetodo{ {src: s.cp.hostOutputDir + "/include/includew/foo", dst: "/include/includew/foo", size: 3}, }) - c.Check(s.cp.manifest, check.Matches, `(?ms).*\./include/includer .*`) - c.Check(s.cp.manifest, check.Not(check.Matches), `(?ms).*exclude.*`) + manifest, err := s.cp.staged.MarshalManifest(".") + c.Assert(err, check.IsNil) + c.Check(manifest, check.Matches, `(?ms).*\./include/includer .*`) + c.Check(manifest, check.Not(check.Matches), `(?ms).*exclude.*`) c.Check(s.log.String(), check.Matches, `(?ms).*not copying \\"exclude/excluder\\".*`) c.Check(s.log.String(), check.Matches, `(?ms).*not copying \\"nonexistent/collection\\".*`) } @@ -420,7 +432,7 @@ func (s *copierSuite) writeFileInOutputDir(c *check.C, path, data string) { } // applyGlobsToFilesAndDirs uses the same glob-matching code as -// applyGlobsToCollectionFS, so we don't need to test all of the same +// applyGlobsToStaged, so we don't need to test all of the same // glob-matching behavior covered in TestApplyGlobsToCollectionFS. We // do need to check that (a) the glob is actually being used to filter // out files, and (b) non-matching dirs still included if and only if @@ -582,8 +594,8 @@ func (s *copierSuite) TestApplyGlobsToCollectionFS(c *check.C) { c.Logf("=== globs: %q", trial.globs) collfs, err := (&arvados.Collection{ManifestText: ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:bar 0:0:baz/quux 0:0:baz/parent1/item1\n"}).FileSystem(nil, nil) c.Assert(err, check.IsNil) - cp := copier{globs: trial.globs} - err = cp.applyGlobsToCollectionFS(collfs) + cp := copier{globs: trial.globs, staged: collfs} + err = cp.applyGlobsToStaged() if !c.Check(err, check.IsNil) { continue } -- 2.30.2