12519: Simplified code by nesting an additional MergingLoader
[arvados.git] / services / crunch-run / upload.go
index faf20b4f8b4568076b17151655927885a6d038de..b54e336c2d99c50d472cc71b668e1c169ad9c2e4 100644 (file)
@@ -19,7 +19,6 @@ import (
        "errors"
        "fmt"
        "io"
-       "io/ioutil"
        "log"
        "os"
        "path/filepath"
@@ -264,55 +263,27 @@ type WalkUpload struct {
        mtx         sync.Mutex
 }
 
-// WalkFunc walks a directory tree, uploads each file found and adds it to the
-// CollectionWriter.
-func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
-       if err != nil {
-               return err
-       }
-
-       targetPath, targetInfo := path, info
-       if info.Mode()&os.ModeSymlink != 0 {
-               // Update targetpath/info to reflect the symlink
-               // target, not the symlink itself
-               targetPath, err = filepath.EvalSymlinks(path)
-               if err != nil {
-                       return err
-               }
-               targetInfo, err = os.Stat(targetPath)
-               if err != nil {
-                       return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
-               }
-               if targetInfo.Mode()&os.ModeDir != 0 {
-                       // Symlinks to directories don't get walked, so do it
-                       // here.  We've previously checked that they stay in
-                       // the output directory and don't result in an endless
-                       // loop.
-                       var rd []os.FileInfo
-                       rd, err = ioutil.ReadDir(path)
-                       if err != nil {
-                               return err
-                       }
-                       for _, ent := range rd {
-                               err = filepath.Walk(filepath.Join(path, ent.Name()), m.WalkFunc)
-                       }
-               }
-       }
-
-       if targetInfo.Mode()&os.ModeType != 0 {
-               // Skip directories, pipes, other non-regular files
-               return nil
-       }
-
+func (m *WalkUpload) UploadFile(path string, sourcePath string) error {
        var dir string
-       if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
-               dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+       basename := filepath.Base(path)
+       if len(path) > (len(m.stripPrefix) + len(basename) + 1) {
+               dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)]
        }
        if dir == "" {
                dir = "."
        }
 
-       fn := path[(len(path) - len(info.Name())):]
+       fn := path[(len(path) - len(basename)):]
+
+       info, err := os.Stat(sourcePath)
+       if err != nil {
+               return err
+       }
+       file, err := os.Open(sourcePath)
+       if err != nil {
+               return err
+       }
+       defer file.Close()
 
        if m.streamMap[dir] == nil {
                m.streamMap[dir] = &CollectionFileWriter{
@@ -342,16 +313,11 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
        // Reset the CollectionFileWriter for a new file
        fileWriter.NewFile(fn)
 
-       file, err := os.Open(path)
-       if err != nil {
-               return err
-       }
-       defer file.Close()
-
        m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
 
        _, err = io.Copy(fileWriter, file)
        if err != nil {
+               m.status.Printf("Uh oh")
                return err
        }
 
@@ -361,20 +327,15 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
        return nil
 }
 
-func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload {
        streamMap := make(map[string]*CollectionFileWriter)
-       wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
-       err = filepath.Walk(root, wu.WalkFunc)
-
-       if err != nil {
-               return "", err
-       }
+       return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
+}
 
+func (cw *CollectionWriter) EndUpload(wu *WalkUpload) {
        cw.mtx.Lock()
-       for _, st := range streamMap {
+       for _, st := range wu.streamMap {
                cw.Streams = append(cw.Streams, st)
        }
        cw.mtx.Unlock()
-
-       return cw.ManifestText()
 }