X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f4ca9ad94a6bb006d1f3c7ba207837f1736d1247..da42861619eb478cd1f01d58a1ebe59f1a25002e:/services/crunch-run/upload.go diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go index 4a2693a678..a068a2a77b 100644 --- a/services/crunch-run/upload.go +++ b/services/crunch-run/upload.go @@ -17,7 +17,11 @@ import ( "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/manifest" "io" + "log" + "os" + "path/filepath" "strings" + "sync" ) // Block is a data block in a manifest stream @@ -77,6 +81,12 @@ func (m *CollectionFileWriter) Close() error { return nil } +func (m *CollectionFileWriter) NewFile(fn string) { + m.offset += m.length + m.length = 0 + m.fn = fn +} + func (m *CollectionFileWriter) goUpload() { var errors []error uploader := m.uploader @@ -93,11 +103,12 @@ func (m *CollectionFileWriter) goUpload() { finish <- errors } -// CollectionWriter makes implements creating new Keep collections by opening files +// CollectionWriter implements creating new Keep collections by opening files // and writing to them. type CollectionWriter struct { IKeepClient Streams []*CollectionFileWriter + mtx sync.Mutex } // Open a new file for writing in the Keep collection. @@ -125,6 +136,8 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser { fn} go fw.goUpload() + m.mtx.Lock() + defer m.mtx.Unlock() m.Streams = append(m.Streams, fw) return fw @@ -133,6 +146,9 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser { // Finish writing the collection, wait for all blocks to complete uploading. func (m *CollectionWriter) Finish() error { var errstring string + m.mtx.Lock() + defer m.mtx.Unlock() + for _, stream := range m.Streams { if stream.uploader == nil { continue @@ -168,7 +184,12 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) { var buf bytes.Buffer + m.mtx.Lock() + defer m.mtx.Unlock() for _, v := range m.Streams { + if len(v.FileStreamSegments) == 0 { + continue + } k := v.StreamName if k == "." { buf.WriteString(".") @@ -177,9 +198,13 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) { k = strings.Replace(k, "\n", "", -1) buf.WriteString("./" + k) } - for _, b := range v.Blocks { - buf.WriteString(" ") - buf.WriteString(b) + if len(v.Blocks) > 0 { + for _, b := range v.Blocks { + buf.WriteString(" ") + buf.WriteString(b) + } + } else { + buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0") } for _, f := range v.FileStreamSegments { buf.WriteString(" ") @@ -191,3 +216,83 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) { } return buf.String(), nil } + +type WalkUpload struct { + kc IKeepClient + stripPrefix string + streamMap map[string]*CollectionFileWriter + status *log.Logger +} + +// WalkFunc walks a directory tree, uploads each file found and adds it to the +// CollectionWriter. +func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error { + + if info.IsDir() { + return nil + } + + var dir string + if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) { + dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)] + } + if dir == "" { + dir = "." + } + + fn := path[(len(path) - len(info.Name())):] + + if m.streamMap[dir] == nil { + m.streamMap[dir] = &CollectionFileWriter{ + m.kc, + &manifest.ManifestStream{StreamName: dir}, + 0, + 0, + nil, + make(chan *Block), + make(chan []error), + ""} + go m.streamMap[dir].goUpload() + } + + fileWriter := m.streamMap[dir] + + // Reset the CollectionFileWriter for a new file + fileWriter.NewFile(fn) + + file, err := os.Open(path) + if err != nil { + return err + } + defer file.Close() + + m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size()) + + _, err = io.Copy(fileWriter, file) + if err != nil { + return err + } + + // Commits the current file. Legal to call this repeatedly. + fileWriter.Close() + + return nil +} + +func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) { + streamMap := make(map[string]*CollectionFileWriter) + wu := &WalkUpload{cw.IKeepClient, root, streamMap, status} + err = filepath.Walk(root, wu.WalkFunc) + + if err != nil { + return "", err + } + + cw.mtx.Lock() + for _, st := range streamMap { + cw.Streams = append(cw.Streams, st) + } + cw.mtx.Unlock() + + return cw.ManifestText() +}