closes #8508
[arvados.git] / services / crunch-run / upload.go
index c3b8c374ef50f58d6090a76d5216eb206a9fb47a..a068a2a77b3c6805b1eecdfc73d1df2eefd50ed0 100644 (file)
@@ -18,6 +18,8 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/manifest"
        "io"
        "log"
+       "os"
+       "path/filepath"
        "strings"
        "sync"
 )
@@ -101,7 +103,7 @@ func (m *CollectionFileWriter) goUpload() {
        finish <- errors
 }
 
-// CollectionWriter makes implements creating new Keep collections by opening files
+// CollectionWriter implements creating new Keep collections by opening files
 // and writing to them.
 type CollectionWriter struct {
        IKeepClient
@@ -185,6 +187,9 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
        m.mtx.Lock()
        defer m.mtx.Unlock()
        for _, v := range m.Streams {
+               if len(v.FileStreamSegments) == 0 {
+                       continue
+               }
                k := v.StreamName
                if k == "." {
                        buf.WriteString(".")
@@ -193,9 +198,13 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
                        k = strings.Replace(k, "\n", "", -1)
                        buf.WriteString("./" + k)
                }
-               for _, b := range v.Blocks {
-                       buf.WriteString(" ")
-                       buf.WriteString(b)
+               if len(v.Blocks) > 0 {
+                       for _, b := range v.Blocks {
+                               buf.WriteString(" ")
+                               buf.WriteString(b)
+                       }
+               } else {
+                       buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
                }
                for _, f := range v.FileStreamSegments {
                        buf.WriteString(" ")
@@ -208,22 +217,24 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
        return buf.String(), nil
 }
 
+type WalkUpload struct {
+       kc          IKeepClient
+       stripPrefix string
+       streamMap   map[string]*CollectionFileWriter
+       status      *log.Logger
+}
+
 // WalkFunc walks a directory tree, uploads each file found and adds it to the
 // CollectionWriter.
-func (m *CollectionWriter) WalkFunc(path string,
-       info os.FileInfo,
-       err error,
-       stripPrefix string,
-       streamMap map[string]*manifest.ManifestStream,
-       status log.Logger) error {
+func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
 
        if info.IsDir() {
                return nil
        }
 
        var dir string
-       if len(path) > (len(stripPrefix) + len(info.Name()) + 1) {
-               dir = path[len(stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+       if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+               dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
        }
        if dir == "" {
                dir = "."
@@ -231,9 +242,9 @@ func (m *CollectionWriter) WalkFunc(path string,
 
        fn := path[(len(path) - len(info.Name())):]
 
-       if streamMap[dir] == nil {
-               streamMap[dir] = &CollectionFileWriter{
-                       m.IKeepClient,
+       if m.streamMap[dir] == nil {
+               m.streamMap[dir] = &CollectionFileWriter{
+                       m.kc,
                        &manifest.ManifestStream{StreamName: dir},
                        0,
                        0,
@@ -241,10 +252,10 @@ func (m *CollectionWriter) WalkFunc(path string,
                        make(chan *Block),
                        make(chan []error),
                        ""}
-               go streamMap[dir].goUpload()
+               go m.streamMap[dir].goUpload()
        }
 
-       fileWriter := streamMap[dir]
+       fileWriter := m.streamMap[dir]
 
        // Reset the CollectionFileWriter for a new file
        fileWriter.NewFile(fn)
@@ -253,11 +264,11 @@ func (m *CollectionWriter) WalkFunc(path string,
        if err != nil {
                return err
        }
+       defer file.Close()
 
-       status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+       m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
 
-       var count int64
-       count, err = io.Copy(fileWriter, file)
+       _, err = io.Copy(fileWriter, file)
        if err != nil {
                return err
        }
@@ -268,16 +279,10 @@ func (m *CollectionWriter) WalkFunc(path string,
        return nil
 }
 
-func (cw *CollectionWriter) WriteTree(root string, status log.Logger) (manifest string, err error) {
-       streamMap := make(map[string]*ManifestStreamWriter)
-       err = filepath.Walk(root, func(path string, info os.FileInfo, err error) {
-               return cw.WalkFunc(path,
-                       info,
-                       err,
-                       root,
-                       streamMap,
-                       status)
-       })
+func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+       streamMap := make(map[string]*CollectionFileWriter)
+       wu := &WalkUpload{cw.IKeepClient, root, streamMap, status}
+       err = filepath.Walk(root, wu.WalkFunc)
 
        if err != nil {
                return "", err
@@ -289,5 +294,5 @@ func (cw *CollectionWriter) WriteTree(root string, status log.Logger) (manifest
        }
        cw.mtx.Unlock()
 
-       return mw.ManifestText()
+       return cw.ManifestText()
 }