3 // Originally based on sdk/go/crunchrunner/upload.go
5 // Unlike the original, which iterates over a directory tree and uploads each
6 // file sequentially, this version supports opening and writing multiple files
7 // in a collection simultaneously.
9 // Eventually this should move into the Arvados Go SDK for a more comprehensive
10 // implementation of Collections.
17 "git.curoverse.com/arvados.git/sdk/go/keepclient"
18 "git.curoverse.com/arvados.git/sdk/go/manifest"
25 // Block is a data block in a manifest stream
31 // CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
32 type CollectionFileWriter struct {
34 *manifest.ManifestStream
43 // Write to a file in a keep collection
44 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
45 n, err := m.ReadFrom(bytes.NewReader(p))
49 // ReadFrom a Reader and write to the Keep collection file.
50 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
56 m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
58 count, err = r.Read(m.Block.data[m.Block.offset:])
60 m.Block.offset += int64(count)
61 if m.Block.offset == keepclient.BLOCKSIZE {
67 m.length += uint64(total)
75 // Close stops writing a file and adds it to the parent manifest.
76 func (m *CollectionFileWriter) Close() error {
77 m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
78 manifest.FileStreamSegment{m.offset, m.length, m.fn})
82 func (m *CollectionFileWriter) NewFile(fn string) {
88 func (m *CollectionFileWriter) goUpload() {
90 uploader := m.uploader
92 for block := range uploader {
93 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
94 signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
96 errors = append(errors, err)
98 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
104 // CollectionWriter makes implements creating new Keep collections by opening files
105 // and writing to them.
106 type CollectionWriter struct {
108 Streams []*CollectionFileWriter
112 // Open a new file for writing in the Keep collection.
113 func (m *CollectionWriter) Open(path string) io.WriteCloser {
117 i := strings.Index(path, "/")
119 dir = "./" + path[0:i]
126 fw := &CollectionFileWriter{
128 &manifest.ManifestStream{StreamName: dir},
139 m.Streams = append(m.Streams, fw)
144 // Finish writing the collection, wait for all blocks to complete uploading.
145 func (m *CollectionWriter) Finish() error {
150 for _, stream := range m.Streams {
151 if stream.uploader == nil {
154 if stream.Block != nil {
155 stream.uploader <- stream.Block
157 close(stream.uploader)
158 stream.uploader = nil
160 errors := <-stream.finish
164 for _, r := range errors {
165 errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
169 return errors.New(errstring)
174 // ManifestText returns the manifest text of the collection. Calls Finish()
175 // first to ensure that all blocks are written and that signed locators and
177 func (m *CollectionWriter) ManifestText() (mt string, err error) {
187 for _, v := range m.Streams {
192 k = strings.Replace(k, " ", "\\040", -1)
193 k = strings.Replace(k, "\n", "", -1)
194 buf.WriteString("./" + k)
196 for _, b := range v.Blocks {
200 for _, f := range v.FileStreamSegments {
202 name := strings.Replace(f.Name, " ", "\\040", -1)
203 name = strings.Replace(name, "\n", "", -1)
204 buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
206 buf.WriteString("\n")
208 return buf.String(), nil
211 // WalkFunc walks a directory tree, uploads each file found and adds it to the
213 func (m *CollectionWriter) WalkFunc(path string,
217 streamMap map[string]*manifest.ManifestStream,
218 status log.Logger) error {
225 if len(path) > (len(stripPrefix) + len(info.Name()) + 1) {
226 dir = path[len(stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
232 fn := path[(len(path) - len(info.Name())):]
234 if streamMap[dir] == nil {
235 streamMap[dir] = &CollectionFileWriter{
237 &manifest.ManifestStream{StreamName: dir},
244 go streamMap[dir].goUpload()
247 fileWriter := streamMap[dir]
249 // Reset the CollectionFileWriter for a new file
250 fileWriter.NewFile(fn)
252 file, err := os.Open(path)
258 status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
261 count, err = io.Copy(fileWriter, file)
266 // Commits the current file. Legal to call this repeatedly.
272 func (cw *CollectionWriter) WriteTree(root string, status log.Logger) (manifest string, err error) {
273 streamMap := make(map[string]*ManifestStreamWriter)
274 err = filepath.Walk(root, func(path string, info os.FileInfo, err error) {
275 return cw.WalkFunc(path,
288 for _, st := range streamMap {
289 cw.Streams = append(cw.Streams, st)
293 return mw.ManifestText()