X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c8cba416487f240c6e8395b1581d9f4e441cc5f7..c6df16d2af30e989bcfb04f6ef730cde658a9dc9:/services/crunch-run/upload.go diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go index c3b8c374ef..a068a2a77b 100644 --- a/services/crunch-run/upload.go +++ b/services/crunch-run/upload.go @@ -18,6 +18,8 @@ import ( "git.curoverse.com/arvados.git/sdk/go/manifest" "io" "log" + "os" + "path/filepath" "strings" "sync" ) @@ -101,7 +103,7 @@ func (m *CollectionFileWriter) goUpload() { finish <- errors } -// CollectionWriter makes implements creating new Keep collections by opening files +// CollectionWriter implements creating new Keep collections by opening files // and writing to them. type CollectionWriter struct { IKeepClient @@ -185,6 +187,9 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) { m.mtx.Lock() defer m.mtx.Unlock() for _, v := range m.Streams { + if len(v.FileStreamSegments) == 0 { + continue + } k := v.StreamName if k == "." { buf.WriteString(".") @@ -193,9 +198,13 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) { k = strings.Replace(k, "\n", "", -1) buf.WriteString("./" + k) } - for _, b := range v.Blocks { - buf.WriteString(" ") - buf.WriteString(b) + if len(v.Blocks) > 0 { + for _, b := range v.Blocks { + buf.WriteString(" ") + buf.WriteString(b) + } + } else { + buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0") } for _, f := range v.FileStreamSegments { buf.WriteString(" ") @@ -208,22 +217,24 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) { return buf.String(), nil } +type WalkUpload struct { + kc IKeepClient + stripPrefix string + streamMap map[string]*CollectionFileWriter + status *log.Logger +} + // WalkFunc walks a directory tree, uploads each file found and adds it to the // CollectionWriter. -func (m *CollectionWriter) WalkFunc(path string, - info os.FileInfo, - err error, - stripPrefix string, - streamMap map[string]*manifest.ManifestStream, - status log.Logger) error { +func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error { if info.IsDir() { return nil } var dir string - if len(path) > (len(stripPrefix) + len(info.Name()) + 1) { - dir = path[len(stripPrefix)+1 : (len(path) - len(info.Name()) - 1)] + if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) { + dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)] } if dir == "" { dir = "." @@ -231,9 +242,9 @@ func (m *CollectionWriter) WalkFunc(path string, fn := path[(len(path) - len(info.Name())):] - if streamMap[dir] == nil { - streamMap[dir] = &CollectionFileWriter{ - m.IKeepClient, + if m.streamMap[dir] == nil { + m.streamMap[dir] = &CollectionFileWriter{ + m.kc, &manifest.ManifestStream{StreamName: dir}, 0, 0, @@ -241,10 +252,10 @@ func (m *CollectionWriter) WalkFunc(path string, make(chan *Block), make(chan []error), ""} - go streamMap[dir].goUpload() + go m.streamMap[dir].goUpload() } - fileWriter := streamMap[dir] + fileWriter := m.streamMap[dir] // Reset the CollectionFileWriter for a new file fileWriter.NewFile(fn) @@ -253,11 +264,11 @@ func (m *CollectionWriter) WalkFunc(path string, if err != nil { return err } + defer file.Close() - status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size()) + m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size()) - var count int64 - count, err = io.Copy(fileWriter, file) + _, err = io.Copy(fileWriter, file) if err != nil { return err } @@ -268,16 +279,10 @@ func (m *CollectionWriter) WalkFunc(path string, return nil } -func (cw *CollectionWriter) WriteTree(root string, status log.Logger) (manifest string, err error) { - streamMap := make(map[string]*ManifestStreamWriter) - err = filepath.Walk(root, func(path string, info os.FileInfo, err error) { - return cw.WalkFunc(path, - info, - err, - root, - streamMap, - status) - }) +func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) { + streamMap := make(map[string]*CollectionFileWriter) + wu := &WalkUpload{cw.IKeepClient, root, streamMap, status} + err = filepath.Walk(root, wu.WalkFunc) if err != nil { return "", err @@ -289,5 +294,5 @@ func (cw *CollectionWriter) WriteTree(root string, status log.Logger) (manifest } cw.mtx.Unlock() - return mw.ManifestText() + return cw.ManifestText() }