9397: update comments
[arvados.git] / services / crunch-run / upload.go
index 4a2693a6788f473adbe90570b7faeffe7357b222..a068a2a77b3c6805b1eecdfc73d1df2eefd50ed0 100644 (file)
@@ -17,7 +17,11 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
        "io"
+       "log"
+       "os"
+       "path/filepath"
        "strings"
+       "sync"
 )
 
 // Block is a data block in a manifest stream
@@ -77,6 +81,12 @@ func (m *CollectionFileWriter) Close() error {
        return nil
 }
 
+func (m *CollectionFileWriter) NewFile(fn string) {
+       m.offset += m.length
+       m.length = 0
+       m.fn = fn
+}
+
 func (m *CollectionFileWriter) goUpload() {
        var errors []error
        uploader := m.uploader
@@ -93,11 +103,12 @@ func (m *CollectionFileWriter) goUpload() {
        finish <- errors
 }
 
-// CollectionWriter makes implements creating new Keep collections by opening files
+// CollectionWriter implements creating new Keep collections by opening files
 // and writing to them.
 type CollectionWriter struct {
        IKeepClient
        Streams []*CollectionFileWriter
+       mtx     sync.Mutex
 }
 
 // Open a new file for writing in the Keep collection.
@@ -125,6 +136,8 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser {
                fn}
        go fw.goUpload()
 
+       m.mtx.Lock()
+       defer m.mtx.Unlock()
        m.Streams = append(m.Streams, fw)
 
        return fw
@@ -133,6 +146,9 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser {
 // Finish writing the collection, wait for all blocks to complete uploading.
 func (m *CollectionWriter) Finish() error {
        var errstring string
+       m.mtx.Lock()
+       defer m.mtx.Unlock()
+
        for _, stream := range m.Streams {
                if stream.uploader == nil {
                        continue
@@ -168,7 +184,12 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
 
        var buf bytes.Buffer
 
+       m.mtx.Lock()
+       defer m.mtx.Unlock()
        for _, v := range m.Streams {
+               if len(v.FileStreamSegments) == 0 {
+                       continue
+               }
                k := v.StreamName
                if k == "." {
                        buf.WriteString(".")
@@ -177,9 +198,13 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
                        k = strings.Replace(k, "\n", "", -1)
                        buf.WriteString("./" + k)
                }
-               for _, b := range v.Blocks {
-                       buf.WriteString(" ")
-                       buf.WriteString(b)
+               if len(v.Blocks) > 0 {
+                       for _, b := range v.Blocks {
+                               buf.WriteString(" ")
+                               buf.WriteString(b)
+                       }
+               } else {
+                       buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
                }
                for _, f := range v.FileStreamSegments {
                        buf.WriteString(" ")
@@ -191,3 +216,83 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
        }
        return buf.String(), nil
 }
+
+type WalkUpload struct {
+       kc          IKeepClient
+       stripPrefix string
+       streamMap   map[string]*CollectionFileWriter
+       status      *log.Logger
+}
+
+// WalkFunc walks a directory tree, uploads each file found and adds it to the
+// CollectionWriter.
+func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
+
+       if info.IsDir() {
+               return nil
+       }
+
+       var dir string
+       if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+               dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+       }
+       if dir == "" {
+               dir = "."
+       }
+
+       fn := path[(len(path) - len(info.Name())):]
+
+       if m.streamMap[dir] == nil {
+               m.streamMap[dir] = &CollectionFileWriter{
+                       m.kc,
+                       &manifest.ManifestStream{StreamName: dir},
+                       0,
+                       0,
+                       nil,
+                       make(chan *Block),
+                       make(chan []error),
+                       ""}
+               go m.streamMap[dir].goUpload()
+       }
+
+       fileWriter := m.streamMap[dir]
+
+       // Reset the CollectionFileWriter for a new file
+       fileWriter.NewFile(fn)
+
+       file, err := os.Open(path)
+       if err != nil {
+               return err
+       }
+       defer file.Close()
+
+       m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
+       _, err = io.Copy(fileWriter, file)
+       if err != nil {
+               return err
+       }
+
+       // Commits the current file.  Legal to call this repeatedly.
+       fileWriter.Close()
+
+       return nil
+}
+
+func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+       streamMap := make(map[string]*CollectionFileWriter)
+       wu := &WalkUpload{cw.IKeepClient, root, streamMap, status}
+       err = filepath.Walk(root, wu.WalkFunc)
+
+       if err != nil {
+               return "", err
+       }
+
+       cw.mtx.Lock()
+       for _, st := range streamMap {
+               cw.Streams = append(cw.Streams, st)
+       }
+       cw.mtx.Unlock()
+
+       return cw.ManifestText()
+}