8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
9 "git.curoverse.com/arvados.git/sdk/go/manifest"
19 type CollectionFileWriter struct {
21 *manifest.ManifestStream
30 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
31 n, err := m.ReadFrom(bytes.NewReader(p))
35 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
41 m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
43 count, err = r.Read(m.Block.data[m.Block.offset:])
45 m.Block.offset += int64(count)
46 if m.Block.offset == keepclient.BLOCKSIZE {
52 m.length += uint64(total)
61 func (m *CollectionFileWriter) Close() error {
62 m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
63 manifest.FileStreamSegment{m.offset, m.length, m.fn})
67 func (m *CollectionFileWriter) goUpload() {
69 uploader := m.uploader
71 for block := range uploader {
72 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
73 signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
75 errors = append(errors, err)
77 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
83 type CollectionWriter struct {
85 Streams []*CollectionFileWriter
88 func (m *CollectionWriter) Open(path string) io.WriteCloser {
92 i := strings.Index(path, "/")
94 dir = "./" + path[0:i]
101 fw := &CollectionFileWriter{
103 &manifest.ManifestStream{StreamName: dir},
112 m.Streams = append(m.Streams, fw)
117 func (m *CollectionWriter) Finish() error {
119 for _, stream := range m.Streams {
120 if stream.uploader == nil {
123 if stream.Block != nil {
124 stream.uploader <- stream.Block
126 close(stream.uploader)
127 stream.uploader = nil
129 errors := <-stream.finish
133 for _, r := range errors {
134 errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
138 return errors.New(errstring)
144 func (m *CollectionWriter) ManifestText() (mt string, err error) {
152 for _, v := range m.Streams {
157 k = strings.Replace(k, " ", "\\040", -1)
158 k = strings.Replace(k, "\n", "", -1)
159 buf.WriteString("./" + k)
161 for _, b := range v.Blocks {
165 for _, f := range v.FileStreamSegments {
167 name := strings.Replace(f.Name, " ", "\\040", -1)
168 name = strings.Replace(name, "\n", "", -1)
169 buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
171 buf.WriteString("\n")
173 return buf.String(), nil