X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7600537f3f34ee88a76688dbb0e1d73723905fa7..af6d31cba8346ac86bc0027eb0f675144fb43056:/sdk/go/crunchrunner/upload.go diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go index 2196a9d7df..4ced0ce31b 100644 --- a/sdk/go/crunchrunner/upload.go +++ b/sdk/go/crunchrunner/upload.go @@ -3,6 +3,7 @@ package main import ( "bytes" "crypto/md5" + "errors" "fmt" "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/manifest" @@ -10,6 +11,8 @@ import ( "log" "os" "path/filepath" + "sort" + "strings" ) type Block struct { @@ -23,16 +26,16 @@ type ManifestStreamWriter struct { offset int64 *Block uploader chan *Block + finish chan []error } type IKeepClient interface { PutHB(hash string, buf []byte) (string, int, error) } -func (m *ManifestStreamWriter) Write(p []byte) (n int, err error) { - // Needed to conform to Writer interface, but not implemented - // because io.Copy will actually use ReadFrom instead. - return 0, nil +func (m *ManifestStreamWriter) Write(p []byte) (int, error) { + n, err := m.ReadFrom(bytes.NewReader(p)) + return int(n), err } func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) { @@ -46,28 +49,34 @@ func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) { count, err = r.Read(m.Block.data[m.Block.offset:]) total += int64(count) m.Block.offset += int64(count) - if count > 0 { - if m.Block.offset == keepclient.BLOCKSIZE { - m.uploader <- m.Block - m.Block = nil - } + if m.Block.offset == keepclient.BLOCKSIZE { + m.uploader <- m.Block + m.Block = nil } } - return total, err + if err == io.EOF { + return total, nil + } else { + return total, err + } + } func (m *ManifestStreamWriter) goUpload() { - select { - case block, valid := <-m.uploader: - if !valid { - return - } + var errors []error + uploader := m.uploader + finish := m.finish + for block := range uploader { hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset])) - signedHash, _, _ := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset]) - m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash) + signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset]) + if err != nil { + errors = append(errors, err) + } else { + m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash) + } } - + finish <- errors } type ManifestWriter struct { @@ -76,23 +85,19 @@ type ManifestWriter struct { Streams map[string]*ManifestStreamWriter } -type walker struct { - currentDir string - m *ManifestWriter -} - -func (w walker) WalkFunc(path string, info os.FileInfo, err error) error { - log.Print("path ", path, " ", info.Name(), " ", info.IsDir()) - +func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error { if info.IsDir() { - if path == w.currentDir { - return nil - } - return filepath.Walk(path, walker{path, w.m}.WalkFunc) + return nil + } + + var dir string + if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) { + dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)] + } + if dir == "" { + dir = "." } - m := w.m - dir := path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()))] fn := path[(len(path) - len(info.Name())):] if m.Streams[dir] == nil { @@ -101,7 +106,8 @@ func (w walker) WalkFunc(path string, info os.FileInfo, err error) error { &manifest.ManifestStream{StreamName: dir}, 0, nil, - make(chan *Block)} + make(chan *Block), + make(chan []error)} go m.Streams[dir].goUpload() } @@ -114,9 +120,11 @@ func (w walker) WalkFunc(path string, info os.FileInfo, err error) error { return err } + log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size()) + var count int64 count, err = io.Copy(stream, file) - if err != nil && err != io.EOF { + if err != nil { return err } @@ -128,25 +136,53 @@ func (w walker) WalkFunc(path string, info os.FileInfo, err error) error { return nil } -func (m *ManifestWriter) Finish() { - for _, v := range m.Streams { - if v.uploader != nil { - if v.Block != nil { - v.uploader <- v.Block - } - close(v.uploader) - v.uploader = nil +func (m *ManifestWriter) Finish() error { + var errstring string + for _, stream := range m.Streams { + if stream.uploader == nil { + continue + } + if stream.Block != nil { + stream.uploader <- stream.Block + } + close(stream.uploader) + stream.uploader = nil + + errors := <-stream.finish + close(stream.finish) + stream.finish = nil + + for _, r := range errors { + errstring = fmt.Sprintf("%v%v\n", errstring, r.Error()) } } + if errstring != "" { + return errors.New(errstring) + } else { + return nil + } } func (m *ManifestWriter) ManifestText() string { m.Finish() var buf bytes.Buffer - for k, v := range m.Streams { - if k == "" { + + dirs := make([]string, len(m.Streams)) + i := 0 + for k := range m.Streams { + dirs[i] = k + i++ + } + sort.Strings(dirs) + + for _, k := range dirs { + v := m.Streams[k] + + if k == "." { buf.WriteString(".") } else { + k = strings.Replace(k, " ", "\\040", -1) + k = strings.Replace(k, "\n", "", -1) buf.WriteString("./" + k) } for _, b := range v.Blocks { @@ -155,6 +191,8 @@ func (m *ManifestWriter) ManifestText() string { } for _, f := range v.Files { buf.WriteString(" ") + f = strings.Replace(f, " ", "\\040", -1) + f = strings.Replace(f, "\n", "", -1) buf.WriteString(f) } buf.WriteString("\n") @@ -164,13 +202,16 @@ func (m *ManifestWriter) ManifestText() string { func WriteTree(kc IKeepClient, root string) (manifest string, err error) { mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}} - err = filepath.Walk(root, walker{root, &mw}.WalkFunc) - mw.Finish() + err = filepath.Walk(root, mw.WalkFunc) if err != nil { return "", err - } else { - return mw.ManifestText(), nil } + err = mw.Finish() + if err != nil { + return "", err + } + + return mw.ManifestText(), nil }