X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/19ae770973482257117fe8ded5619c3018c4b60f..e10aa8e0c8b9c45d69832e71480cfb3d6929834e:/sdk/go/crunchrunner/upload.go diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go deleted file mode 100644 index a3dc3d52a8..0000000000 --- a/sdk/go/crunchrunner/upload.go +++ /dev/null @@ -1,217 +0,0 @@ -package main - -import ( - "bytes" - "crypto/md5" - "errors" - "fmt" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "git.curoverse.com/arvados.git/sdk/go/manifest" - "io" - "log" - "os" - "path/filepath" - "sort" - "strings" -) - -type Block struct { - data []byte - offset int64 -} - -type ManifestStreamWriter struct { - *ManifestWriter - *manifest.ManifestStream - offset int64 - *Block - uploader chan *Block - finish chan []error -} - -type IKeepClient interface { - PutHB(hash string, buf []byte) (string, int, error) -} - -func (m *ManifestStreamWriter) Write(p []byte) (int, error) { - n, err := m.ReadFrom(bytes.NewReader(p)) - return int(n), err -} - -func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) { - var total int64 - var count int - - for err == nil { - if m.Block == nil { - m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0} - } - count, err = r.Read(m.Block.data[m.Block.offset:]) - total += int64(count) - m.Block.offset += int64(count) - if m.Block.offset == keepclient.BLOCKSIZE { - m.uploader <- m.Block - m.Block = nil - } - } - - if err == io.EOF { - return total, nil - } else { - return total, err - } - -} - -func (m *ManifestStreamWriter) goUpload() { - var errors []error - uploader := m.uploader - finish := m.finish - for block := range uploader { - hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset])) - signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset]) - if err != nil { - errors = append(errors, err) - } else { - m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash) - } - } - finish <- errors -} - -type ManifestWriter struct { - IKeepClient - stripPrefix string - Streams map[string]*ManifestStreamWriter -} - -func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error { - if info.IsDir() { - return nil - } - - var dir string - if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) { - dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)] - } - if dir == "" { - dir = "." - } - - fn := path[(len(path) - len(info.Name())):] - - if m.Streams[dir] == nil { - m.Streams[dir] = &ManifestStreamWriter{ - m, - &manifest.ManifestStream{StreamName: dir}, - 0, - nil, - make(chan *Block), - make(chan []error)} - go m.Streams[dir].goUpload() - } - - stream := m.Streams[dir] - - fileStart := stream.offset - - file, err := os.Open(path) - if err != nil { - return err - } - - log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size()) - - var count int64 - count, err = io.Copy(stream, file) - if err != nil { - return err - } - - stream.offset += count - - stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments, - manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn}) - - return nil -} - -func (m *ManifestWriter) Finish() error { - var errstring string - for _, stream := range m.Streams { - if stream.uploader == nil { - continue - } - if stream.Block != nil { - stream.uploader <- stream.Block - } - close(stream.uploader) - stream.uploader = nil - - errors := <-stream.finish - close(stream.finish) - stream.finish = nil - - for _, r := range errors { - errstring = fmt.Sprintf("%v%v\n", errstring, r.Error()) - } - } - if errstring != "" { - return errors.New(errstring) - } else { - return nil - } -} - -func (m *ManifestWriter) ManifestText() string { - m.Finish() - var buf bytes.Buffer - - dirs := make([]string, len(m.Streams)) - i := 0 - for k := range m.Streams { - dirs[i] = k - i++ - } - sort.Strings(dirs) - - for _, k := range dirs { - v := m.Streams[k] - - if k == "." { - buf.WriteString(".") - } else { - k = strings.Replace(k, " ", "\\040", -1) - k = strings.Replace(k, "\n", "", -1) - buf.WriteString("./" + k) - } - for _, b := range v.Blocks { - buf.WriteString(" ") - buf.WriteString(b) - } - for _, f := range v.FileStreamSegments { - buf.WriteString(" ") - name := strings.Replace(f.Name, " ", "\\040", -1) - name = strings.Replace(name, "\n", "", -1) - buf.WriteString(fmt.Sprintf("%d:%d:%s", f.SegPos, f.SegLen, name)) - } - buf.WriteString("\n") - } - return buf.String() -} - -func WriteTree(kc IKeepClient, root string) (manifest string, err error) { - mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}} - err = filepath.Walk(root, mw.WalkFunc) - - if err != nil { - return "", err - } - - err = mw.Finish() - if err != nil { - return "", err - } - - return mw.ManifestText(), nil -}