1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
7 // Originally based on sdk/go/crunchrunner/upload.go
9 // Unlike the original, which iterates over a directory tree and uploads each
10 // file sequentially, this version supports opening and writing multiple files
11 // in a collection simultaneously.
13 // Eventually this should move into the Arvados Go SDK for a more comprehensive
14 // implementation of Collections.
29 "git.curoverse.com/arvados.git/sdk/go/keepclient"
30 "git.curoverse.com/arvados.git/sdk/go/manifest"
33 // Block is a data block in a manifest stream
39 // CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
40 type CollectionFileWriter struct {
42 *manifest.ManifestStream
51 // Write to a file in a keep collection
52 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
53 n, err := m.ReadFrom(bytes.NewReader(p))
57 // ReadFrom a Reader and write to the Keep collection file.
58 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
64 m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
66 count, err = r.Read(m.Block.data[m.Block.offset:])
68 m.Block.offset += int64(count)
69 if m.Block.offset == keepclient.BLOCKSIZE {
75 m.length += uint64(total)
83 // Close stops writing a file and adds it to the parent manifest.
84 func (m *CollectionFileWriter) Close() error {
85 m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
86 manifest.FileStreamSegment{m.offset, m.length, m.fn})
90 func (m *CollectionFileWriter) NewFile(fn string) {
96 func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
101 uploader := m.uploader
103 for block := range uploader {
105 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
106 blockIndex := len(m.ManifestStream.Blocks) - 1
109 workers <- struct{}{} // wait for an available worker slot
112 go func(block *Block, blockIndex int) {
113 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
114 signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
119 errors = append(errors, err)
121 m.ManifestStream.Blocks[blockIndex] = signedHash
133 // CollectionWriter implements creating new Keep collections by opening files
134 // and writing to them.
135 type CollectionWriter struct {
138 Streams []*CollectionFileWriter
139 workers chan struct{}
143 // Open a new file for writing in the Keep collection.
144 func (m *CollectionWriter) Open(path string) io.WriteCloser {
148 i := strings.Index(path, "/")
150 dir = "./" + path[0:i]
157 fw := &CollectionFileWriter{
159 &manifest.ManifestStream{StreamName: dir},
169 if m.workers == nil {
170 if m.MaxWriters < 1 {
173 m.workers = make(chan struct{}, m.MaxWriters)
176 go fw.goUpload(m.workers)
178 m.Streams = append(m.Streams, fw)
183 // Finish writing the collection, wait for all blocks to complete uploading.
184 func (m *CollectionWriter) Finish() error {
189 for _, stream := range m.Streams {
190 if stream.uploader == nil {
193 if stream.Block != nil {
194 stream.uploader <- stream.Block
196 close(stream.uploader)
197 stream.uploader = nil
199 errors := <-stream.finish
203 for _, r := range errors {
204 errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
208 return errors.New(errstring)
213 // ManifestText returns the manifest text of the collection. Calls Finish()
214 // first to ensure that all blocks are written and that signed locators and
216 func (m *CollectionWriter) ManifestText() (mt string, err error) {
226 for _, v := range m.Streams {
227 if len(v.FileStreamSegments) == 0 {
234 k = strings.Replace(k, " ", "\\040", -1)
235 k = strings.Replace(k, "\n", "", -1)
236 buf.WriteString("./" + k)
238 if len(v.Blocks) > 0 {
239 for _, b := range v.Blocks {
244 buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
246 for _, f := range v.FileStreamSegments {
248 name := strings.Replace(f.Name, " ", "\\040", -1)
249 name = strings.Replace(name, "\n", "", -1)
250 buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
252 buf.WriteString("\n")
254 return buf.String(), nil
257 type WalkUpload struct {
261 streamMap map[string]*CollectionFileWriter
263 workers chan struct{}
267 // WalkFunc walks a directory tree, uploads each file found and adds it to the
269 func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
274 targetPath, targetInfo := path, info
275 if info.Mode()&os.ModeSymlink != 0 {
276 // Update targetpath/info to reflect the symlink
277 // target, not the symlink itself
278 targetPath, err = filepath.EvalSymlinks(path)
282 targetInfo, err = os.Stat(targetPath)
284 return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
286 if targetInfo.IsDir() {
287 // Symlinks to directories don't get walked, so do it
288 // here. We've previously checked that they stay in
289 // the output directory and don't result in an endless
292 rd, err = ioutil.ReadDir(path)
296 for _, ent := range rd {
297 err = filepath.Walk(filepath.Join(path, ent.Name()), m.WalkFunc)
302 if targetInfo.Mode()&os.ModeType != 0 {
303 // Skip directories, pipes, other non-regular files
308 if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
309 dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
315 fn := path[(len(path) - len(info.Name())):]
317 if m.streamMap[dir] == nil {
318 m.streamMap[dir] = &CollectionFileWriter{
320 &manifest.ManifestStream{StreamName: dir},
329 if m.workers == nil {
330 if m.MaxWriters < 1 {
333 m.workers = make(chan struct{}, m.MaxWriters)
337 go m.streamMap[dir].goUpload(m.workers)
340 fileWriter := m.streamMap[dir]
342 // Reset the CollectionFileWriter for a new file
343 fileWriter.NewFile(fn)
345 file, err := os.Open(path)
351 m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
353 _, err = io.Copy(fileWriter, file)
358 // Commits the current file. Legal to call this repeatedly.
364 func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
365 streamMap := make(map[string]*CollectionFileWriter)
366 wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
367 err = filepath.Walk(root, wu.WalkFunc)
374 for _, st := range streamMap {
375 cw.Streams = append(cw.Streams, st)
379 return cw.ManifestText()