3 // Originally based on sdk/go/crunchrunner/upload.go
5 // Unlike the original, which iterates over a directory tree and uploads each
6 // file sequentially, this version supports opening and writing multiple files
7 // in a collection simultaneously.
9 // Eventually this should move into the Arvados Go SDK for a more comprehensive
10 // implementation of Collections.
17 "git.curoverse.com/arvados.git/sdk/go/keepclient"
18 "git.curoverse.com/arvados.git/sdk/go/manifest"
23 // Block is a data block in a manifest stream
29 // CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
30 type CollectionFileWriter struct {
32 *manifest.ManifestStream
41 // Write to a file in a keep collection
42 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
43 n, err := m.ReadFrom(bytes.NewReader(p))
47 // ReadFrom a Reader and write to the Keep collection file.
48 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
54 m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
56 count, err = r.Read(m.Block.data[m.Block.offset:])
58 m.Block.offset += int64(count)
59 if m.Block.offset == keepclient.BLOCKSIZE {
65 m.length += uint64(total)
73 // Close stops writing a file and adds it to the parent manifest.
74 func (m *CollectionFileWriter) Close() error {
75 m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
76 manifest.FileStreamSegment{m.offset, m.length, m.fn})
80 func (m *CollectionFileWriter) goUpload() {
82 uploader := m.uploader
84 for block := range uploader {
85 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
86 signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
88 errors = append(errors, err)
90 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
96 // CollectionWriter makes implements creating new Keep collections by opening files
97 // and writing to them.
98 type CollectionWriter struct {
100 Streams []*CollectionFileWriter
103 // Open a new file for writing in the Keep collection.
104 func (m *CollectionWriter) Open(path string) io.WriteCloser {
108 i := strings.Index(path, "/")
110 dir = "./" + path[0:i]
117 fw := &CollectionFileWriter{
119 &manifest.ManifestStream{StreamName: dir},
128 m.Streams = append(m.Streams, fw)
133 // Finish writing the collection, wait for all blocks to complete uploading.
134 func (m *CollectionWriter) Finish() error {
136 for _, stream := range m.Streams {
137 if stream.uploader == nil {
140 if stream.Block != nil {
141 stream.uploader <- stream.Block
143 close(stream.uploader)
144 stream.uploader = nil
146 errors := <-stream.finish
150 for _, r := range errors {
151 errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
155 return errors.New(errstring)
160 // ManifestText returns the manifest text of the collection. Calls Finish()
161 // first to ensure that all blocks are written and that signed locators and
163 func (m *CollectionWriter) ManifestText() (mt string, err error) {
171 for _, v := range m.Streams {
176 k = strings.Replace(k, " ", "\\040", -1)
177 k = strings.Replace(k, "\n", "", -1)
178 buf.WriteString("./" + k)
180 for _, b := range v.Blocks {
184 for _, f := range v.FileStreamSegments {
186 name := strings.Replace(f.Name, " ", "\\040", -1)
187 name = strings.Replace(name, "\n", "", -1)
188 buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
190 buf.WriteString("\n")
192 return buf.String(), nil