7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
8 "git.curoverse.com/arvados.git/sdk/go/manifest"
20 type ManifestStreamWriter struct {
22 *manifest.ManifestStream
28 type IKeepClient interface {
29 PutHB(hash string, buf []byte) (string, int, error)
32 func (m *ManifestStreamWriter) Write(p []byte) (n int, err error) {
33 // Needed to conform to Writer interface, but not implemented
34 // because io.Copy will actually use ReadFrom instead.
38 func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
44 m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
46 count, err = r.Read(m.Block.data[m.Block.offset:])
48 m.Block.offset += int64(count)
50 if m.Block.offset == keepclient.BLOCKSIZE {
60 func (m *ManifestStreamWriter) goUpload() {
62 case block, valid := <-m.uploader:
66 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
67 signedHash, _, _ := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
68 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
73 type ManifestWriter struct {
76 Streams map[string]*ManifestStreamWriter
84 func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
85 log.Print("path ", path, " ", info.Name(), " ", info.IsDir())
88 if path == w.currentDir {
91 return filepath.Walk(path, walker{path, w.m}.WalkFunc)
95 dir := path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()))]
96 fn := path[(len(path) - len(info.Name())):]
98 if m.Streams[dir] == nil {
99 m.Streams[dir] = &ManifestStreamWriter{
101 &manifest.ManifestStream{StreamName: dir},
105 go m.Streams[dir].goUpload()
108 stream := m.Streams[dir]
110 fileStart := stream.offset
112 file, err := os.Open(path)
118 count, err = io.Copy(stream, file)
119 if err != nil && err != io.EOF {
123 stream.offset += count
125 stream.ManifestStream.Files = append(stream.ManifestStream.Files,
126 fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
131 func (m *ManifestWriter) Finish() {
132 for _, v := range m.Streams {
133 if v.uploader != nil {
135 v.uploader <- v.Block
143 func (m *ManifestWriter) ManifestText() string {
146 for k, v := range m.Streams {
150 buf.WriteString("./" + k)
152 for _, b := range v.Blocks {
156 for _, f := range v.Files {
160 buf.WriteString("\n")
165 func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
166 mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
167 err = filepath.Walk(root, walker{root, &mw}.WalkFunc)
173 return mw.ManifestText(), nil