8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
9 "git.curoverse.com/arvados.git/sdk/go/manifest"
20 type ManifestStreamWriter struct {
22 *manifest.ManifestStream
29 type IKeepClient interface {
30 PutHB(hash string, buf []byte) (string, int, error)
33 func (m *ManifestStreamWriter) Write(p []byte) (n int, err error) {
34 // Needed to conform to Writer interface, but not implemented
35 // because io.Copy will actually use ReadFrom instead.
39 func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
45 m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
47 count, err = r.Read(m.Block.data[m.Block.offset:])
49 m.Block.offset += int64(count)
51 if m.Block.offset == keepclient.BLOCKSIZE {
61 func (m *ManifestStreamWriter) goUpload() {
63 uploader := m.uploader
67 case block, valid := <-uploader:
72 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
73 signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
75 errors = append(errors, err)
77 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
83 type ManifestWriter struct {
86 Streams map[string]*ManifestStreamWriter
89 func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
95 if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
96 dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
99 fn := path[(len(path) - len(info.Name())):]
101 if m.Streams[dir] == nil {
102 m.Streams[dir] = &ManifestStreamWriter{
104 &manifest.ManifestStream{StreamName: dir},
109 go m.Streams[dir].goUpload()
112 stream := m.Streams[dir]
114 fileStart := stream.offset
116 file, err := os.Open(path)
122 count, err = io.Copy(stream, file)
123 if err != nil && err != io.EOF {
127 stream.offset += count
129 stream.ManifestStream.Files = append(stream.ManifestStream.Files,
130 fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
135 func (m *ManifestWriter) Finish() error {
137 for _, v := range m.Streams {
138 if v.uploader != nil {
140 v.uploader <- v.Block
150 for _, r := range errors {
151 errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
157 return errors.New(errstring)
163 func (m *ManifestWriter) ManifestText() string {
166 for k, v := range m.Streams {
170 buf.WriteString("./" + k)
172 for _, b := range v.Blocks {
176 for _, f := range v.Files {
180 buf.WriteString("\n")
185 func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
186 mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
187 err = filepath.Walk(root, mw.WalkFunc)
198 return mw.ManifestText(), nil