1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
7 // Originally based on sdk/go/crunchrunner/upload.go
9 // Unlike the original, which iterates over a directory tree and uploads each
10 // file sequentially, this version supports opening and writing multiple files
11 // in a collection simultaneously.
13 // Eventually this should move into the Arvados Go SDK for a more comprehensive
14 // implementation of Collections.
21 "git.curoverse.com/arvados.git/sdk/go/keepclient"
22 "git.curoverse.com/arvados.git/sdk/go/manifest"
31 // Block is a data block in a manifest stream
37 // CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
38 type CollectionFileWriter struct {
40 *manifest.ManifestStream
49 // Write to a file in a keep collection
50 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
51 n, err := m.ReadFrom(bytes.NewReader(p))
55 // ReadFrom a Reader and write to the Keep collection file.
56 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
62 m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
64 count, err = r.Read(m.Block.data[m.Block.offset:])
66 m.Block.offset += int64(count)
67 if m.Block.offset == keepclient.BLOCKSIZE {
73 m.length += uint64(total)
81 // Close stops writing a file and adds it to the parent manifest.
82 func (m *CollectionFileWriter) Close() error {
83 m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
84 manifest.FileStreamSegment{m.offset, m.length, m.fn})
88 func (m *CollectionFileWriter) NewFile(fn string) {
94 func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
99 uploader := m.uploader
101 for block := range uploader {
103 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
104 blockIndex := len(m.ManifestStream.Blocks) - 1
107 workers <- struct{}{} // wait for an available worker slot
110 go func(block *Block, blockIndex int) {
111 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
112 signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
117 errors = append(errors, err)
119 m.ManifestStream.Blocks[blockIndex] = signedHash
131 // CollectionWriter implements creating new Keep collections by opening files
132 // and writing to them.
133 type CollectionWriter struct {
136 Streams []*CollectionFileWriter
137 workers chan struct{}
141 // Open a new file for writing in the Keep collection.
142 func (m *CollectionWriter) Open(path string) io.WriteCloser {
146 i := strings.Index(path, "/")
148 dir = "./" + path[0:i]
155 fw := &CollectionFileWriter{
157 &manifest.ManifestStream{StreamName: dir},
167 if m.workers == nil {
168 if m.MaxWriters < 1 {
171 m.workers = make(chan struct{}, m.MaxWriters)
174 go fw.goUpload(m.workers)
176 m.Streams = append(m.Streams, fw)
181 // Finish writing the collection, wait for all blocks to complete uploading.
182 func (m *CollectionWriter) Finish() error {
187 for _, stream := range m.Streams {
188 if stream.uploader == nil {
191 if stream.Block != nil {
192 stream.uploader <- stream.Block
194 close(stream.uploader)
195 stream.uploader = nil
197 errors := <-stream.finish
201 for _, r := range errors {
202 errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
206 return errors.New(errstring)
211 // ManifestText returns the manifest text of the collection. Calls Finish()
212 // first to ensure that all blocks are written and that signed locators and
214 func (m *CollectionWriter) ManifestText() (mt string, err error) {
224 for _, v := range m.Streams {
225 if len(v.FileStreamSegments) == 0 {
232 k = strings.Replace(k, " ", "\\040", -1)
233 k = strings.Replace(k, "\n", "", -1)
234 buf.WriteString("./" + k)
236 if len(v.Blocks) > 0 {
237 for _, b := range v.Blocks {
242 buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
244 for _, f := range v.FileStreamSegments {
246 name := strings.Replace(f.Name, " ", "\\040", -1)
247 name = strings.Replace(name, "\n", "", -1)
248 buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
250 buf.WriteString("\n")
252 return buf.String(), nil
255 type WalkUpload struct {
259 streamMap map[string]*CollectionFileWriter
261 workers chan struct{}
265 func (m *WalkUpload) UploadFile(path string, sourcePath string) error {
267 basename := filepath.Base(path)
268 if len(path) > (len(m.stripPrefix) + len(basename) + 1) {
269 dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)]
275 fn := path[(len(path) - len(basename)):]
277 info, err := os.Stat(sourcePath)
281 file, err := os.Open(sourcePath)
287 if m.streamMap[dir] == nil {
288 m.streamMap[dir] = &CollectionFileWriter{
290 &manifest.ManifestStream{StreamName: dir},
299 if m.workers == nil {
300 if m.MaxWriters < 1 {
303 m.workers = make(chan struct{}, m.MaxWriters)
307 go m.streamMap[dir].goUpload(m.workers)
310 fileWriter := m.streamMap[dir]
312 // Reset the CollectionFileWriter for a new file
313 fileWriter.NewFile(fn)
315 m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
317 _, err = io.Copy(fileWriter, file)
319 m.status.Printf("Uh oh")
323 // Commits the current file. Legal to call this repeatedly.
329 func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload {
330 streamMap := make(map[string]*CollectionFileWriter)
331 return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
334 func (cw *CollectionWriter) EndUpload(wu *WalkUpload) {
336 for _, st := range wu.streamMap {
337 cw.Streams = append(cw.Streams, st)