12183: Refactor file upload so there is one walk to handle both symlinks and
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 5 Oct 2017 12:47:04 +0000 (08:47 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 1 Nov 2017 20:50:37 +0000 (16:50 -0400)
regular files.

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/crunch-run/upload.go
services/crunch-run/upload_test.go

index 83c4ca39761f9c2ad5f2469e49ec863ea6d69160..bd58201749768fd86fe45328e8b20e907672bb66 100644 (file)
@@ -920,42 +920,47 @@ func (runner *ContainerRunner) WaitFinish() (err error) {
        return nil
 }
 
-// EvalSymlinks follows symlinks within the output directory.  If the symlink
-// leads to a keep mount, copy the manifest text from the keep mount into the
-// output manifestText.  Ensure that whether symlinks are relative or absolute,
-// they must remain within the output directory.
+// UploadFile uploads files within the output directory, with special handling
+// for symlinks. If the symlink leads to a keep mount, copy the manifest text
+// from the keep mount into the output manifestText.  Ensure that whether
+// symlinks are relative or absolute, they must remain within the output
+// directory.
 //
 // Assumes initial value of "path" is absolute, and located within runner.HostOutputDir.
-func (runner *ContainerRunner) EvalSymlinks(path string, binds []string) (manifestText string, symlinksToRemove []string, err error) {
-       var links []string
+func (runner *ContainerRunner) UploadOutputFile(
+       path string,
+       info os.FileInfo,
+       infoerr error,
+       binds []string,
+       walkUpload *WalkUpload,
+       relocateFrom string,
+       relocateTo string) (manifestText string, err error) {
 
-       defer func() {
-               if err != nil {
-                       symlinksToRemove = append(symlinksToRemove, links...)
-               }
-       }()
+       if infoerr != nil {
+               return "", infoerr
+       }
 
-       for n := 0; n < 32; n++ {
-               var info os.FileInfo
-               info, err = os.Lstat(path)
-               if err != nil {
-                       return
-               }
+       relocated := relocateTo + path[len(relocateFrom):]
+
+       if info.Mode().IsRegular() {
+               return "", walkUpload.UploadFile(relocated, path)
+       }
 
+       // Not a regular file, try to follow symlinks
+       var nextlink = path
+       for n := 0; n < 32; n++ {
                if info.Mode()&os.ModeSymlink == 0 {
-                       // Not a symlink, nothing to do.
+                       // Not a symlink, don't do anything
                        return
                }
 
-               // Remember symlink for cleanup later
-               links = append(links, path)
-
                var readlinktgt string
-               readlinktgt, err = os.Readlink(path)
-               tgt := readlinktgt
+               readlinktgt, err = os.Readlink(nextlink)
                if err != nil {
                        return
                }
+
+               tgt := readlinktgt
                if !strings.HasPrefix(tgt, "/") {
                        // Relative symlink, resolve it to host path
                        tgt = filepath.Join(filepath.Dir(path), tgt)
@@ -965,8 +970,6 @@ func (runner *ContainerRunner) EvalSymlinks(path string, binds []string) (manife
                        tgt = filepath.Join(runner.HostOutputDir, tgt[len(runner.Container.OutputPath):])
                }
 
-               runner.CrunchLog.Printf("Resolve %q to %q", path, tgt)
-
                // go through mounts and try reverse map to collection reference
                for _, bind := range binds {
                        mnt := runner.Container.Mounts[bind]
@@ -978,51 +981,56 @@ func (runner *ContainerRunner) EvalSymlinks(path string, binds []string) (manife
                                adjustedMount := mnt
                                adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
 
-                               for _, l := range links {
-                                       // The chain of one or more symlinks
-                                       // terminates in this keep mount, so
-                                       // add them all to the manifest text at
-                                       // appropriate locations.
-                                       var m string
-                                       outputSuffix := l[len(runner.HostOutputDir):]
-                                       m, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
-                                       if err != nil {
-                                               return
-                                       }
-                                       manifestText = manifestText + m
-                                       symlinksToRemove = append(symlinksToRemove, l)
-                               }
+                               // Terminates in this keep mount, so add the
+                               // manifest text at appropriate location.
+                               outputSuffix := path[len(runner.HostOutputDir):]
+                               manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
                                return
                        }
                }
 
-               // If target is not a mount, it must be within the output
-               // directory, otherwise it is an error.
+               // If target is not a collection mount, it must be within the
+               // output directory, otherwise it is an error.
                if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
                        err = fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.",
                                path[len(runner.HostOutputDir):], readlinktgt)
                        return
                }
 
-               // Update symlink to host FS
-               err = os.Remove(path)
+               info, err = os.Lstat(tgt)
                if err != nil {
-                       err = fmt.Errorf("Error removing symlink %q: %v", path, err)
+                       // tgt doesn't exist or lacks permissions
+                       err = fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.",
+                               path[len(runner.HostOutputDir):], readlinktgt)
                        return
                }
 
-               err = os.Symlink(tgt, path)
-               if err != nil {
-                       err = fmt.Errorf("Error updating symlink %q: %v", path, err)
+               if info.Mode().IsRegular() {
+                       // Symlink leads to regular file.  Need to read from
+                       // the target but upload it at the original path.
+                       return "", walkUpload.UploadFile(relocated, tgt)
+               }
+
+               if info.Mode().IsDir() {
+                       // Symlink leads to directory.  Walk() doesn't follow
+                       // directory symlinks, so we walk the target directory
+                       // instead.  Within the walk, file paths are relocated
+                       // so they appear under the original symlink path.
+                       err = filepath.Walk(tgt, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+                               var m string
+                               m, walkerr = runner.UploadOutputFile(walkpath, walkinfo, walkerr, binds, walkUpload, tgt, relocated)
+                               if walkerr == nil {
+                                       manifestText = manifestText + m
+                               }
+                               return walkerr
+                       })
                        return
                }
 
-               // Target is within the output directory, so loop and check if
-               // it is also a symlink.
-               path = tgt
+               nextlink = tgt
        }
        // Got stuck in a loop or just a pathological number of links, give up.
-       err = fmt.Errorf("Too many symlinks.")
+       err = fmt.Errorf("Followed too many symlinks from path %q", path)
        return
 }
 
@@ -1072,40 +1080,25 @@ func (runner *ContainerRunner) CaptureOutput() error {
        if err != nil {
                // Regular directory
 
-               symlinksToRemove := make(map[string]bool)
+               cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
+               walkUpload := cw.BeginUpload(runner.HostOutputDir, runner.CrunchLog.Logger)
+
                var m string
-               var srm []string
-               // Find symlinks to arv-mounted files & dirs.
                err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
-                       if err != nil {
-                               return err
-                       }
-                       m, srm, err = runner.EvalSymlinks(path, binds)
-                       for _, r := range srm {
-                               symlinksToRemove[r] = true
-                       }
+                       m, err = runner.UploadOutputFile(path, info, err, binds, walkUpload, "", "")
                        if err == nil {
                                manifestText = manifestText + m
                        }
                        return err
                })
-               for l, _ := range symlinksToRemove {
-                       err2 := os.Remove(l)
-                       if err2 != nil {
-                               if err == nil {
-                                       err = fmt.Errorf("Error removing symlink %q: %v", err2)
-                               } else {
-                                       err = fmt.Errorf("%v\nError removing symlink %q: %v",
-                                               err, err2)
-                               }
-                       }
-               }
+
+               cw.EndUpload(walkUpload)
+
                if err != nil {
-                       return fmt.Errorf("While checking output symlinks: %v", err)
+                       return fmt.Errorf("While uploading output files: %v", err)
                }
 
-               cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
-               m, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+               m, err = cw.ManifestText()
                manifestText = manifestText + m
                if err != nil {
                        return fmt.Errorf("While uploading output files: %v", err)
index d3606ec6cb9c0ed9e02fbd52f445cfc75252388b..c67988e0a0ac132a85085a24dd2e13d3598a920c 100644 (file)
@@ -1699,27 +1699,19 @@ func (s *TestSuite) TestEvalSymlinks(c *C) {
        os.Symlink("/etc/passwd", realTemp+"/p1")
 
        // Relative outside output dir
-       os.Symlink("../..", realTemp+"/p2")
+       os.Symlink("..", realTemp+"/p2")
 
        // Circular references
        os.Symlink("p4", realTemp+"/p3")
        os.Symlink("p5", realTemp+"/p4")
        os.Symlink("p3", realTemp+"/p5")
 
-       symlinksToRemove := make(map[string]bool)
+       // Target doesn't exist
+       os.Symlink("p99", realTemp+"/p6")
+
        for _, v := range []string{"p1", "p2", "p3", "p4", "p5"} {
-               var srm []string
-               _, srm, err = cr.EvalSymlinks(realTemp+"/"+v, []string{})
+               info, err := os.Lstat(realTemp + "/" + v)
+               _, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "")
                c.Assert(err, NotNil)
-               for _, r := range srm {
-                       symlinksToRemove[r] = true
-               }
        }
-       c.Assert(len(symlinksToRemove), Equals, 5)
-
-       c.Assert(map[string]bool{realTemp + "/" + "p1": true,
-               realTemp + "/" + "p2": true,
-               realTemp + "/" + "p3": true,
-               realTemp + "/" + "p4": true,
-               realTemp + "/" + "p5": true}, DeepEquals, symlinksToRemove)
 }
index 31276832b7f51cdd67a22d0c576ac27116f1b01d..95925e57c6eb557421f0e1d16a23747bcfb516e3 100644 (file)
@@ -18,15 +18,14 @@ import (
        "crypto/md5"
        "errors"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
        "io"
        "log"
        "os"
        "path/filepath"
        "strings"
        "sync"
-
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
 // Block is a data block in a manifest stream
@@ -263,48 +262,27 @@ type WalkUpload struct {
        mtx         sync.Mutex
 }
 
-// WalkFunc walks a directory tree, uploads each file found and adds it to the
-// CollectionWriter.
-func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
-       if err != nil {
-               return err
-       }
-
-       targetPath, targetInfo := path, info
-       if info.Mode()&os.ModeSymlink != 0 {
-               // Update targetpath/info to reflect the symlink
-               // target, not the symlink itself
-               targetPath, err = filepath.EvalSymlinks(path)
-               if err != nil {
-                       return err
-               }
-               targetInfo, err = os.Stat(targetPath)
-               if err != nil {
-                       return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
-               }
-               if targetInfo.IsDir() {
-                       // Symlinks to directories don't get walked, so do it
-                       // here.  We've previously checked that they stay in
-                       // the output directory and don't result in an endless
-                       // loop.
-                       filepath.Walk(path+"/.", m.WalkFunc)
-               }
-       }
-
-       if targetInfo.Mode()&os.ModeType != 0 {
-               // Skip directories, pipes, other non-regular files
-               return nil
-       }
-
+func (m *WalkUpload) UploadFile(path string, sourcePath string) error {
        var dir string
-       if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
-               dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+       basename := filepath.Base(path)
+       if len(path) > (len(m.stripPrefix) + len(basename) + 1) {
+               dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)]
        }
        if dir == "" {
                dir = "."
        }
 
-       fn := path[(len(path) - len(info.Name())):]
+       fn := path[(len(path) - len(basename)):]
+
+       info, err := os.Stat(sourcePath)
+       if err != nil {
+               return err
+       }
+       file, err := os.Open(sourcePath)
+       if err != nil {
+               return err
+       }
+       defer file.Close()
 
        if m.streamMap[dir] == nil {
                m.streamMap[dir] = &CollectionFileWriter{
@@ -334,16 +312,11 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
        // Reset the CollectionFileWriter for a new file
        fileWriter.NewFile(fn)
 
-       file, err := os.Open(path)
-       if err != nil {
-               return err
-       }
-       defer file.Close()
-
        m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
 
        _, err = io.Copy(fileWriter, file)
        if err != nil {
+               m.status.Printf("Uh oh")
                return err
        }
 
@@ -353,20 +326,15 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
        return nil
 }
 
-func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload {
        streamMap := make(map[string]*CollectionFileWriter)
-       wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
-       err = filepath.Walk(root, wu.WalkFunc)
-
-       if err != nil {
-               return "", err
-       }
+       return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
+}
 
+func (cw *CollectionWriter) EndUpload(wu *WalkUpload) {
        cw.mtx.Lock()
-       for _, st := range streamMap {
+       for _, st := range wu.streamMap {
                cw.Streams = append(cw.Streams, st)
        }
        cw.mtx.Unlock()
-
-       return cw.ManifestText()
 }
index 96ea2b119094f37b36c5711280abe2d975171d4e..86dab41a6461222f297d8c7f27dcd05213fd7a20 100644 (file)
@@ -5,13 +5,13 @@
 package main
 
 import (
+       . "gopkg.in/check.v1"
        "io/ioutil"
        "log"
        "os"
+       "path/filepath"
        "sync"
        "syscall"
-
-       . "gopkg.in/check.v1"
 )
 
 type UploadTestSuite struct{}
@@ -19,6 +19,25 @@ type UploadTestSuite struct{}
 // Gocheck boilerplate
 var _ = Suite(&UploadTestSuite{})
 
+func writeTree(cw *CollectionWriter, root string, status *log.Logger) (mt string, err error) {
+       walkUpload := cw.BeginUpload(root, status)
+
+       err = filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+               info, _ = os.Stat(path)
+               if info.Mode().IsRegular() {
+                       return walkUpload.UploadFile(path, path)
+               }
+               return nil
+       })
+
+       cw.EndUpload(walkUpload)
+       if err != nil {
+               return "", err
+       }
+       mt, err = cw.ManifestText()
+       return
+}
+
 func (s *TestSuite) TestSimpleUpload(c *C) {
        tmpdir, _ := ioutil.TempDir("", "")
        defer func() {
@@ -28,12 +47,12 @@ func (s *TestSuite) TestSimpleUpload(c *C) {
        ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 
        cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
-       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
        c.Check(err, IsNil)
        c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
 }
 
-func (s *TestSuite) TestSimpleUploadThreefiles(c *C) {
+func (s *TestSuite) TestUploadThreeFiles(c *C) {
        tmpdir, _ := ioutil.TempDir("", "")
        defer func() {
                os.RemoveAll(tmpdir)
@@ -49,7 +68,7 @@ func (s *TestSuite) TestSimpleUploadThreefiles(c *C) {
        }
 
        cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
-       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, IsNil)
        c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
@@ -67,7 +86,7 @@ func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
        ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
 
        cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
-       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, IsNil)
 
@@ -101,7 +120,7 @@ func (s *TestSuite) TestSimpleUploadLarge(c *C) {
        ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
 
        cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
-       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, IsNil)
        c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
@@ -118,7 +137,7 @@ func (s *TestSuite) TestUploadEmptySubdir(c *C) {
        ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 
        cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
-       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, IsNil)
        c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
@@ -134,7 +153,7 @@ func (s *TestSuite) TestUploadEmptyFile(c *C) {
        ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
 
        cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
-       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, IsNil)
        c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
@@ -150,7 +169,7 @@ func (s *TestSuite) TestUploadError(c *C) {
        ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 
        cw := CollectionWriter{0, &KeepErrorTestClient{}, nil, nil, sync.Mutex{}}
-       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, NotNil)
        c.Check(str, Equals, "")