3 // Originally based on sdk/go/crunchrunner/upload.go
5 // Unlike the original, which iterates over a directory tree and uploads each
6 // file sequentially, this version supports opening and writing multiple files
7 // in a collection simultaneously.
9 // Eventually this should move into the Arvados Go SDK for a more comprehensive
10 // implementation of Collections.
17 "git.curoverse.com/arvados.git/sdk/go/keepclient"
18 "git.curoverse.com/arvados.git/sdk/go/manifest"
27 // Block is a data block in a manifest stream
33 // CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
34 type CollectionFileWriter struct {
36 *manifest.ManifestStream
45 // Write to a file in a keep collection
46 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
47 n, err := m.ReadFrom(bytes.NewReader(p))
51 // ReadFrom a Reader and write to the Keep collection file.
52 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
58 m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
60 count, err = r.Read(m.Block.data[m.Block.offset:])
62 m.Block.offset += int64(count)
63 if m.Block.offset == keepclient.BLOCKSIZE {
69 m.length += uint64(total)
77 // Close stops writing a file and adds it to the parent manifest.
78 func (m *CollectionFileWriter) Close() error {
79 m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
80 manifest.FileStreamSegment{m.offset, m.length, m.fn})
84 func (m *CollectionFileWriter) NewFile(fn string) {
90 func (m *CollectionFileWriter) goUpload() {
92 uploader := m.uploader
94 for block := range uploader {
95 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
96 signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
98 errors = append(errors, err)
100 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
106 // CollectionWriter implements creating new Keep collections by opening files
107 // and writing to them.
108 type CollectionWriter struct {
110 Streams []*CollectionFileWriter
114 // Open a new file for writing in the Keep collection.
115 func (m *CollectionWriter) Open(path string) io.WriteCloser {
119 i := strings.Index(path, "/")
121 dir = "./" + path[0:i]
128 fw := &CollectionFileWriter{
130 &manifest.ManifestStream{StreamName: dir},
141 m.Streams = append(m.Streams, fw)
146 // Finish writing the collection, wait for all blocks to complete uploading.
147 func (m *CollectionWriter) Finish() error {
152 for _, stream := range m.Streams {
153 if stream.uploader == nil {
156 if stream.Block != nil {
157 stream.uploader <- stream.Block
159 close(stream.uploader)
160 stream.uploader = nil
162 errors := <-stream.finish
166 for _, r := range errors {
167 errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
171 return errors.New(errstring)
176 // ManifestText returns the manifest text of the collection. Calls Finish()
177 // first to ensure that all blocks are written and that signed locators and
179 func (m *CollectionWriter) ManifestText() (mt string, err error) {
189 for _, v := range m.Streams {
190 if len(v.FileStreamSegments) == 0 {
197 k = strings.Replace(k, " ", "\\040", -1)
198 k = strings.Replace(k, "\n", "", -1)
199 buf.WriteString("./" + k)
201 if len(v.Blocks) > 0 {
202 for _, b := range v.Blocks {
207 buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
209 for _, f := range v.FileStreamSegments {
211 name := strings.Replace(f.Name, " ", "\\040", -1)
212 name = strings.Replace(name, "\n", "", -1)
213 buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
215 buf.WriteString("\n")
217 return buf.String(), nil
220 type WalkUpload struct {
223 streamMap map[string]*CollectionFileWriter
227 // WalkFunc walks a directory tree, uploads each file found and adds it to the
229 func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
236 if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
237 dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
243 fn := path[(len(path) - len(info.Name())):]
245 if m.streamMap[dir] == nil {
246 m.streamMap[dir] = &CollectionFileWriter{
248 &manifest.ManifestStream{StreamName: dir},
255 go m.streamMap[dir].goUpload()
258 fileWriter := m.streamMap[dir]
260 // Reset the CollectionFileWriter for a new file
261 fileWriter.NewFile(fn)
263 file, err := os.Open(path)
269 m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
271 _, err = io.Copy(fileWriter, file)
276 // Commits the current file. Legal to call this repeatedly.
282 func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
283 streamMap := make(map[string]*CollectionFileWriter)
284 wu := &WalkUpload{cw.IKeepClient, root, streamMap, status}
285 err = filepath.Walk(root, wu.WalkFunc)
292 for _, st := range streamMap {
293 cw.Streams = append(cw.Streams, st)
297 return cw.ManifestText()