11453: Clarify stub server behavior.
[arvados.git] / services / crunch-run / upload.go
index 31276832b7f51cdd67a22d0c576ac27116f1b01d..95925e57c6eb557421f0e1d16a23747bcfb516e3 100644 (file)
@@ -18,15 +18,14 @@ import (
        "crypto/md5"
        "errors"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
        "io"
        "log"
        "os"
        "path/filepath"
        "strings"
        "sync"
-
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
 // Block is a data block in a manifest stream
@@ -263,48 +262,27 @@ type WalkUpload struct {
        mtx         sync.Mutex
 }
 
-// WalkFunc walks a directory tree, uploads each file found and adds it to the
-// CollectionWriter.
-func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
-       if err != nil {
-               return err
-       }
-
-       targetPath, targetInfo := path, info
-       if info.Mode()&os.ModeSymlink != 0 {
-               // Update targetpath/info to reflect the symlink
-               // target, not the symlink itself
-               targetPath, err = filepath.EvalSymlinks(path)
-               if err != nil {
-                       return err
-               }
-               targetInfo, err = os.Stat(targetPath)
-               if err != nil {
-                       return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
-               }
-               if targetInfo.IsDir() {
-                       // Symlinks to directories don't get walked, so do it
-                       // here.  We've previously checked that they stay in
-                       // the output directory and don't result in an endless
-                       // loop.
-                       filepath.Walk(path+"/.", m.WalkFunc)
-               }
-       }
-
-       if targetInfo.Mode()&os.ModeType != 0 {
-               // Skip directories, pipes, other non-regular files
-               return nil
-       }
-
+func (m *WalkUpload) UploadFile(path string, sourcePath string) error {
        var dir string
-       if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
-               dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+       basename := filepath.Base(path)
+       if len(path) > (len(m.stripPrefix) + len(basename) + 1) {
+               dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)]
        }
        if dir == "" {
                dir = "."
        }
 
-       fn := path[(len(path) - len(info.Name())):]
+       fn := path[(len(path) - len(basename)):]
+
+       info, err := os.Stat(sourcePath)
+       if err != nil {
+               return err
+       }
+       file, err := os.Open(sourcePath)
+       if err != nil {
+               return err
+       }
+       defer file.Close()
 
        if m.streamMap[dir] == nil {
                m.streamMap[dir] = &CollectionFileWriter{
@@ -334,16 +312,11 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
        // Reset the CollectionFileWriter for a new file
        fileWriter.NewFile(fn)
 
-       file, err := os.Open(path)
-       if err != nil {
-               return err
-       }
-       defer file.Close()
-
        m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
 
        _, err = io.Copy(fileWriter, file)
        if err != nil {
+               m.status.Printf("Uh oh")
                return err
        }
 
@@ -353,20 +326,15 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
        return nil
 }
 
-func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload {
        streamMap := make(map[string]*CollectionFileWriter)
-       wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
-       err = filepath.Walk(root, wu.WalkFunc)
-
-       if err != nil {
-               return "", err
-       }
+       return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
+}
 
+func (cw *CollectionWriter) EndUpload(wu *WalkUpload) {
        cw.mtx.Lock()
-       for _, st := range streamMap {
+       for _, st := range wu.streamMap {
                cw.Streams = append(cw.Streams, st)
        }
        cw.mtx.Unlock()
-
-       return cw.ManifestText()
 }