12519: Simplified code by nesting an additional MergingLoader
[arvados.git] / services / crunch-run / upload.go
index fa74eb05572ce25623b197419999310b9f402401..b54e336c2d99c50d472cc71b668e1c169ad9c2e4 100644 (file)
@@ -18,14 +18,15 @@ import (
        "crypto/md5"
        "errors"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
        "io"
        "log"
        "os"
        "path/filepath"
        "strings"
        "sync"
+
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
 // Block is a data block in a manifest stream
@@ -262,23 +263,27 @@ type WalkUpload struct {
        mtx         sync.Mutex
 }
 
-// WalkFunc walks a directory tree, uploads each file found and adds it to the
-// CollectionWriter.
-func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
-
-       if info.IsDir() {
-               return nil
-       }
-
+func (m *WalkUpload) UploadFile(path string, sourcePath string) error {
        var dir string
-       if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
-               dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+       basename := filepath.Base(path)
+       if len(path) > (len(m.stripPrefix) + len(basename) + 1) {
+               dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)]
        }
        if dir == "" {
                dir = "."
        }
 
-       fn := path[(len(path) - len(info.Name())):]
+       fn := path[(len(path) - len(basename)):]
+
+       info, err := os.Stat(sourcePath)
+       if err != nil {
+               return err
+       }
+       file, err := os.Open(sourcePath)
+       if err != nil {
+               return err
+       }
+       defer file.Close()
 
        if m.streamMap[dir] == nil {
                m.streamMap[dir] = &CollectionFileWriter{
@@ -308,16 +313,11 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
        // Reset the CollectionFileWriter for a new file
        fileWriter.NewFile(fn)
 
-       file, err := os.Open(path)
-       if err != nil {
-               return err
-       }
-       defer file.Close()
-
        m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
 
        _, err = io.Copy(fileWriter, file)
        if err != nil {
+               m.status.Printf("Uh oh")
                return err
        }
 
@@ -327,20 +327,15 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
        return nil
 }
 
-func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload {
        streamMap := make(map[string]*CollectionFileWriter)
-       wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
-       err = filepath.Walk(root, wu.WalkFunc)
-
-       if err != nil {
-               return "", err
-       }
+       return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
+}
 
+func (cw *CollectionWriter) EndUpload(wu *WalkUpload) {
        cw.mtx.Lock()
-       for _, st := range streamMap {
+       for _, st := range wu.streamMap {
                cw.Streams = append(cw.Streams, st)
        }
        cw.mtx.Unlock()
-
-       return cw.ManifestText()
 }